Class: EventHub::Adapters::Aws
- Inherits:
-
Object
- Object
- EventHub::Adapters::Aws
- Defined in:
- lib/event_hub_aws/adapters/aws.rb,
lib/event_hub_aws/adapters/aws/message.rb
Defined Under Namespace
Classes: Message
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #delete_message(receipt_handle) ⇒ Object
- #fifo_exchange? ⇒ Boolean
-
#initialize(config) ⇒ Aws
constructor
A new instance of Aws.
- #publish(event) ⇒ Object
- #setup_bindings ⇒ Object
- #sns ⇒ Object
- #sqs ⇒ Object
- #subscribe(&block) ⇒ Object
- #topic ⇒ Object
Constructor Details
#initialize(config) ⇒ Aws
11 12 13 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 11 def initialize(config) @config = config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 9 def config @config end |
Instance Method Details
#delete_message(receipt_handle) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 66 def (receipt_handle) sqs.( queue_url: @config[:queue_url], receipt_handle: receipt_handle ) end |
#fifo_exchange? ⇒ Boolean
85 86 87 88 89 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 85 def fifo_exchange? return @fifo_exchange if defined?(@fifo_exchange) @fifo_exchange = @config[:exchange_arn].end_with?('.fifo') end |
#publish(event) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 33 def publish(event) = { message: event.body, message_attributes: { event: { data_type: 'String', string_value: event.class.event }, version: { data_type: 'String', string_value: event.class.version }, }, } if fifo_exchange? .merge!(message_group_id: 'message_group_id', message_deduplication_id: SecureRandom.uuid) end topic.publish() end |
#setup_bindings ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 47 def setup_bindings events = @config[:subscribe].keys events = ['__nothing__'] if events.empty? # AWS doesn't allow blank filters policy = { event: events }.to_json subscription = topic.subscriptions.find { |s| s.attributes['Endpoint'] == @config[:queue_arn] } if subscription subscription.set_attributes({ attribute_name: 'FilterPolicy', attribute_value: policy }) else topic.subscribe({ protocol: 'sqs', attributes: { 'FilterPolicy' => policy }, endpoint: @config[:queue_arn] }) end end |
#sns ⇒ Object
77 78 79 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 77 def sns @sns ||= ::Aws::SNS::Resource.new(@config[:credentials] || {}) end |
#sqs ⇒ Object
81 82 83 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 81 def sqs @sqs ||= ::Aws::SQS::Client.new(@config[:credentials] || {}) end |
#subscribe(&block) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 15 def subscribe(&block) loop do = sqs.( queue_url: @config[:queue_url], message_attribute_names: ['All'], # Receive all custom attributes. max_number_of_messages: 10, # Receive at most one message. wait_time_seconds: 15 # Do not wait to check for the message. ) # Display information about the message. # Display the message's body and each custom attribute value. ..each do |aws_msg| = Message.new(self, aws_msg) block.call() end end end |
#topic ⇒ Object
73 74 75 |
# File 'lib/event_hub_aws/adapters/aws.rb', line 73 def topic @topic ||= sns.topic(@config[:exchange_arn]) end |