Class: Maitredee::Adapters::SnsSqsAdapter
- Inherits:
-
BaseAdapter
- Object
- BaseAdapter
- Maitredee::Adapters::SnsSqsAdapter
- Defined in:
- lib/maitredee/adapters/sns_sqs_adapter.rb
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
readonly
Returns the value of attribute access_key_id.
-
#region ⇒ Object
readonly
Returns the value of attribute region.
-
#secret_access_key ⇒ Object
readonly
Returns the value of attribute secret_access_key.
Instance Method Summary collapse
- #add_worker(subscriber_class) ⇒ Object
-
#configure_broker(config) ⇒ Object
creates topics from keys and queues from values, and subscribes queues to topics.
-
#initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) ⇒ SnsSqsAdapter
constructor
A new instance of SnsSqsAdapter.
-
#publish(message) ⇒ Object
publishes message to SNS.
-
#subscribe(queue_resource_name:, topic_resource_name:) ⇒ Object
subscribes a queue to a topic.
Constructor Details
#initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) ⇒ SnsSqsAdapter
Returns a new instance of SnsSqsAdapter.
15 16 17 18 19 20 21 22 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 15 def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"] @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"] @region = region || ENV["MAITREDEE_AWS_REGION"] @default_shoryuken_options = Shoryuken.sqs_client = sqs_client end |
Instance Attribute Details
#access_key_id ⇒ Object (readonly)
Returns the value of attribute access_key_id.
7 8 9 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7 def access_key_id @access_key_id end |
#region ⇒ Object (readonly)
Returns the value of attribute region.
7 8 9 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7 def region @region end |
#secret_access_key ⇒ Object (readonly)
Returns the value of attribute secret_access_key.
7 8 9 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 7 def secret_access_key @secret_access_key end |
Instance Method Details
#add_worker(subscriber_class) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 107 def add_worker(subscriber_class) worker_name = "#{subscriber_class.name}Worker" worker_class = self.class.const_defined?(worker_name) unless worker_class worker_class = Class.new(Worker) worker_class. .merge( queue: subscriber_class.queue_resource_name ) worker_class.subscriber_class = subscriber_class self.class.const_set worker_name, worker_class end worker_class end |
#configure_broker(config) ⇒ Object
creates topics from keys and queues from values, and subscribes queues to topics
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 45 def configure_broker(config) config.each do |topic_resource_name, queue_resource_names| queue_resource_names.each do |queue_resource_name| subscribe( topic_resource_name: topic_resource_name, queue_resource_name: queue_resource_name ) end end end |
#publish(message) ⇒ Object
publishes message to SNS
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 26 def publish() = { message_id: ., topic_name: .topic_name, event_name: .event_name, primary_key: .primary_key, schema_name: .schema_name, maitredee_version: Maitredee::VERSION }.compact sns_client.publish( topic_arn: topics[.topic_resource_name].arn, message: JSON.dump(.body), message_attributes: () ) end |
#subscribe(queue_resource_name:, topic_resource_name:) ⇒ Object
subscribes a queue to a topic
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/maitredee/adapters/sns_sqs_adapter.rb', line 82 def subscribe(queue_resource_name:, topic_resource_name:) topic = topics[topic_resource_name] queue = queues[queue_resource_name] queue_arn = queue.attributes["QueueArn"] resp = sns_client.subscribe( topic_arn: topic.arn, protocol: "sqs", endpoint: queue_arn, attributes: { "RawMessageDelivery" => "true" } ) subscriptions[resp.subscription_arn] = Aws::SNS::Subscription.new(resp.subscription_arn, client: sns_client) queue.set_attributes( attributes: { "Policy" => sqs_policy( queue_arn: queue_arn, topic_arn: topic.arn ) } ) end |