Class: EventHub::Adapters::Aws

Inherits:
Object
  • Object
show all
Defined in:
lib/event_hub_aws/adapters/aws.rb,
lib/event_hub_aws/adapters/aws/message.rb

Defined Under Namespace

Classes: Message

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Aws



11
12
13
# File 'lib/event_hub_aws/adapters/aws.rb', line 11

def initialize(config)
  @config = config
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/event_hub_aws/adapters/aws.rb', line 9

def config
  @config
end

Instance Method Details

#delete_message(receipt_handle) ⇒ Object



66
67
68
69
70
71
# File 'lib/event_hub_aws/adapters/aws.rb', line 66

def delete_message(receipt_handle)
  sqs.delete_message(
    queue_url: @config[:queue_url],
    receipt_handle: receipt_handle
  )
end

#fifo_exchange?Boolean



85
86
87
88
89
# File 'lib/event_hub_aws/adapters/aws.rb', line 85

def fifo_exchange?
  return @fifo_exchange if defined?(@fifo_exchange)

  @fifo_exchange = @config[:exchange_arn].end_with?('.fifo')
end

#publish(event) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/event_hub_aws/adapters/aws.rb', line 33

def publish(event)
  message = {
    message: event.body,
    message_attributes: {
      event: { data_type: 'String', string_value: event.class.event },
      version: { data_type: 'String', string_value: event.class.version },
    },
  }
  if fifo_exchange?
    message.merge!(message_group_id: 'message_group_id', message_deduplication_id: SecureRandom.uuid)
  end
  topic.publish(message)
end

#setup_bindingsObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/event_hub_aws/adapters/aws.rb', line 47

def setup_bindings
  events = @config[:subscribe].keys
  events = ['__nothing__'] if events.empty? # AWS doesn't allow blank filters
  policy = { event: events }.to_json
  subscription = topic.subscriptions.find { |s| s.attributes['Endpoint'] == @config[:queue_arn] }
  if subscription
    subscription.set_attributes({
                                  attribute_name: 'FilterPolicy',
                                  attribute_value: policy
                                })
  else
    topic.subscribe({
                      protocol: 'sqs',
                      attributes: { 'FilterPolicy' => policy },
                      endpoint: @config[:queue_arn]
                    })
  end
end

#snsObject



77
78
79
# File 'lib/event_hub_aws/adapters/aws.rb', line 77

def sns
  @sns ||= ::Aws::SNS::Resource.new(@config[:credentials] || {})
end

#sqsObject



81
82
83
# File 'lib/event_hub_aws/adapters/aws.rb', line 81

def sqs
  @sqs ||= ::Aws::SQS::Client.new(@config[:credentials] || {})
end

#subscribe(&block) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/event_hub_aws/adapters/aws.rb', line 15

def subscribe(&block)
  loop do
    receive_message_result = sqs.receive_message(
      queue_url: @config[:queue_url],
      message_attribute_names: ['All'], # Receive all custom attributes.
      max_number_of_messages: 10, # Receive at most one message.
      wait_time_seconds: 15 # Do not wait to check for the message.
    )

    # Display information about the message.
    # Display the message's body and each custom attribute value.
    receive_message_result.messages.each do |aws_msg|
      message = Message.new(self, aws_msg)
      block.call(message)
    end
  end
end

#topicObject



73
74
75
# File 'lib/event_hub_aws/adapters/aws.rb', line 73

def topic
  @topic ||= sns.topic(@config[:exchange_arn])
end