Class: EventQ::Amazon::SubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq/eventq_aws/aws_subscription_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ SubscriptionManager

Returns a new instance of SubscriptionManager.



6
7
8
9
10
11
12
13
# File 'lib/eventq/eventq_aws/aws_subscription_manager.rb', line 6

def initialize(options)
  mandatory = %i[client queue_manager]
  missing = mandatory - options.keys
  raise "[#{self.class}] - Missing options #{missing} must be specified." unless missing.empty?

  @client = options[:client]
  @manager = options[:queue_manager]
end

Instance Method Details

#subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace]) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/eventq/eventq_aws/aws_subscription_manager.rb', line 15

def subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace])
  if queue.isolated
    method = :get_topic_arn
  else
    method = :create_topic_arn
  end

  topic_arn = @client.sns_helper(topic_region).public_send(method, event_type, topic_region)
  raise Exceptions::EventTypeNotFound, "SNS topic not found, unable to subscribe to #{event_type}" unless topic_arn

  queue_arn = configure_queue(queue, queue_region)

  # subscribe the queue to the topic with the namespaces provided
  topic_namespaces.each do |namespace|
    namespaced_topic_arn = topic_arn.gsub(":#{EventQ.namespace}-", ":#{namespace}-")

    # create the sns topic - this method is idempotent & returns the topic arn if it already exists
    @client.sns_helper.create_topic_arn("#{namespace}-#{event_type}".delete('.')) unless queue.isolated

    # skip subscribe if subscription for given queue/topic already exists
    # this is a workaround for a localstack issue: https://github.com/localstack/localstack/issues/933
    return true if existing_subscription?(queue_arn, namespaced_topic_arn)

    EventQ.logger.debug do
      "[#{self.class} #subscribe] - Subscribing Queue: #{queue.name} to topic_arn: #{namespaced_topic_arn}, endpoint: #{queue_arn}"
    end

    @client.sns(topic_region).subscribe(
      topic_arn: namespaced_topic_arn,
      protocol: 'sqs',
      endpoint: queue_arn
    )
  end

  true
end

#unsubscribe(_queue) ⇒ Object



52
53
54
# File 'lib/eventq/eventq_aws/aws_subscription_manager.rb', line 52

def unsubscribe(_queue)
  raise "[#{self.class}] - Not implemented. Please unsubscribe the queue from the topic inside the AWS Management Console."
end