Class: EventQ::Amazon::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq/eventq_aws/aws_queue_manager.rb

Constant Summary collapse

VISIBILITY_TIMEOUT =
'VisibilityTimeout'
MESSAGE_RETENTION_PERIOD =
'MessageRetentionPeriod'

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ QueueManager

Returns a new instance of QueueManager.



10
11
12
13
14
15
16
17
18
19
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 10

def initialize(options)
  mandatory = [:client]
  missing = mandatory - options.keys
  raise "[#{self.class}] - Missing options #{missing} must be specified." unless missing.empty?

  @client = options[:client]
  @visibility_timeout = options[:visibility_timeout] || 300 #5 minutes
  @message_retention_period = options[:message_retention_period] || 1209600 #14 days (max aws value)

end

Instance Method Details

#create_queue(queue) ⇒ Object



29
30
31
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 29

def create_queue(queue)
  @client.sqs_helper.create_queue(queue, queue_attributes(queue))
end

#drop_queue(queue) ⇒ Object



33
34
35
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 33

def drop_queue(queue)
  @client.sqs_helper.drop_queue(queue)
end

#drop_topic(event_type) ⇒ Object



37
38
39
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 37

def drop_topic(event_type)
  @client.sns_helper.drop_topic(event_type)
end

#get_queue(queue) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 21

def get_queue(queue)
  if queue.dlq
    queue_exists?(queue.dlq) ? update_queue(queue.dlq) : create_queue(queue.dlq)
  end

  queue_exists?(queue) ? update_queue(queue) : create_queue(queue)
end

#queue_attributes(queue) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 53

def queue_attributes(queue)
  attributes = {
    VISIBILITY_TIMEOUT => @visibility_timeout.to_s,
    MESSAGE_RETENTION_PERIOD => @message_retention_period.to_s
  }

  if queue.dlq
    dlq_arn = @client.sqs_helper.get_queue_arn(queue.dlq)
    attributes['RedrivePolicy'] = %Q({"maxReceiveCount":"#{queue.max_receive_count}","deadLetterTargetArn":"#{dlq_arn}"})
  end

  attributes
end

#queue_exists?(queue) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 45

def queue_exists?(queue)
  !!@client.sqs_helper.get_queue_url(queue)
end

#topic_exists?(event_type) ⇒ Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 41

def topic_exists?(event_type)
  !!@client.sns_helper.get_topic_arn(event_type)
end

#update_queue(queue) ⇒ Object



49
50
51
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 49

def update_queue(queue)
  @client.sqs_helper.update_queue(queue, queue_attributes(queue))
end