Class: Maitredee::Adapters::SnsSqsAdapter

Inherits:
BaseAdapter
  • Object
show all
Defined in:
lib/maitredee/adapters/sns_sqs_adapter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) ⇒ SnsSqsAdapter

Returns a new instance of SnsSqsAdapter.

Parameters:

  • access_key_id (String) (defaults to: nil)

    if nil will look in ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]

  • secret_access_key (String) (defaults to: nil)

    if nil will look in ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]

  • region (String) (defaults to: nil)

    if nil will look in ENV["MAITREDEE_AWS_REGION"]

  • default_shoryuken_options (Hash) (defaults to: nil)

    default options of the shoryuken job listening to the queues defaults to { body_parser: :json, auto_delete: true }



15
16
17
18
19
20
21
22
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 15

def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil)
  @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]
  @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]
  @region = region || ENV["MAITREDEE_AWS_REGION"]
  @default_shoryuken_options = default_shoryuken_options

  Shoryuken.sqs_client = sqs_client
end

Instance Attribute Details

#access_key_idObject (readonly)

Returns the value of attribute access_key_id.



7
8
9
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7

def access_key_id
  @access_key_id
end

#regionObject (readonly)

Returns the value of attribute region.



7
8
9
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7

def region
  @region
end

#secret_access_keyObject (readonly)

Returns the value of attribute secret_access_key.



7
8
9
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7

def secret_access_key
  @secret_access_key
end

Instance Method Details

#add_worker(subscriber_class) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 107

def add_worker(subscriber_class)
  worker_name = "#{subscriber_class.name}Worker"
  worker_class = self.class.const_defined?(worker_name)
  unless worker_class
    worker_class = Class.new(Worker)
    worker_class.shoryuken_options default_shoryuken_options.merge(
      queue: subscriber_class.queue_resource_name
    )
    worker_class.subscriber_class = subscriber_class
    self.class.const_set worker_name, worker_class
  end
  worker_class
end

#configure_broker(config) ⇒ Object

creates topics from keys and queues from values, and subscribes queues to topics

Parameters:

  • config (Hash{String => Array<String>})


45
46
47
48
49
50
51
52
53
54
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 45

def configure_broker(config)
  config.each do |topic_resource_name, queue_resource_names|
    queue_resource_names.each do |queue_resource_name|
      subscribe(
        topic_resource_name: topic_resource_name,
        queue_resource_name: queue_resource_name
      )
    end
  end
end

#publish(message) ⇒ Object

publishes message to SNS

Parameters:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 26

def publish(message)
  message_attributes = {
    message_id: message.message_id,
    topic_name: message.topic_name,
    event_name: message.event_name,
    primary_key: message.primary_key,
    schema_name: message.schema_name,
    maitredee_version: Maitredee::VERSION
  }.compact

  sns_client.publish(
    topic_arn: topics[message.topic_resource_name].arn,
    message: JSON.dump(message.body),
    message_attributes: sns_message_attributes(message_attributes)
  )
end

#subscribe(queue_resource_name:, topic_resource_name:) ⇒ Object

subscribes a queue to a topic

Parameters:

  • queue_resource_name (String)
  • topic_resource_name (String)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 82

def subscribe(queue_resource_name:, topic_resource_name:)
  topic = topics[topic_resource_name]
  queue = queues[queue_resource_name]
  queue_arn = queue.attributes["QueueArn"]

  resp = sns_client.subscribe(
    topic_arn: topic.arn,
    protocol: "sqs",
    endpoint: queue_arn,
    attributes: { "RawMessageDelivery" => "true" }
  )

  subscriptions[resp.subscription_arn] =
    Aws::SNS::Subscription.new(resp.subscription_arn, client: sns_client)

  queue.set_attributes(
    attributes: {
      "Policy" => sqs_policy(
        queue_arn: queue_arn,
        topic_arn: topic.arn
      )
    }
  )
end