Class: EventQ::Amazon::QueueManager
- Inherits:
-
Object
- Object
- EventQ::Amazon::QueueManager
- Defined in:
- lib/eventq/eventq_aws/aws_queue_manager.rb
Constant Summary collapse
- VISIBILITY_TIMEOUT =
'VisibilityTimeout'
- MESSAGE_RETENTION_PERIOD =
'MessageRetentionPeriod'
Instance Method Summary collapse
- #create_queue(queue) ⇒ Object
- #drop_queue(queue) ⇒ Object
- #drop_topic(event_type) ⇒ Object
- #get_queue(queue) ⇒ Object
-
#initialize(options) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #queue_attributes(queue) ⇒ Object
- #queue_exists?(queue) ⇒ Boolean
- #topic_exists?(event_type) ⇒ Boolean
- #update_queue(queue) ⇒ Object
Constructor Details
#initialize(options) ⇒ QueueManager
Returns a new instance of QueueManager.
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 10 def initialize() mandatory = [:client] missing = mandatory - .keys raise "[#{self.class}] - Missing options #{missing} must be specified." unless missing.empty? @client = [:client] @visibility_timeout = [:visibility_timeout] || 300 #5 minutes @message_retention_period = [:message_retention_period] || 1209600 #14 days (max aws value) end |
Instance Method Details
#create_queue(queue) ⇒ Object
29 30 31 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 29 def create_queue(queue) @client.sqs_helper.create_queue(queue, queue_attributes(queue)) end |
#drop_queue(queue) ⇒ Object
33 34 35 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 33 def drop_queue(queue) @client.sqs_helper.drop_queue(queue) end |
#drop_topic(event_type) ⇒ Object
37 38 39 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 37 def drop_topic(event_type) @client.sns_helper.drop_topic(event_type) end |
#get_queue(queue) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 21 def get_queue(queue) if queue.dlq queue_exists?(queue.dlq) ? update_queue(queue.dlq) : create_queue(queue.dlq) end queue_exists?(queue) ? update_queue(queue) : create_queue(queue) end |
#queue_attributes(queue) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 53 def queue_attributes(queue) attributes = { VISIBILITY_TIMEOUT => @visibility_timeout.to_s, MESSAGE_RETENTION_PERIOD => @message_retention_period.to_s } if queue.dlq dlq_arn = @client.sqs_helper.get_queue_arn(queue.dlq) attributes['RedrivePolicy'] = %Q({"maxReceiveCount":"#{queue.max_receive_count}","deadLetterTargetArn":"#{dlq_arn}"}) end attributes end |
#queue_exists?(queue) ⇒ Boolean
45 46 47 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 45 def queue_exists?(queue) !!@client.sqs_helper.get_queue_url(queue) end |
#topic_exists?(event_type) ⇒ Boolean
41 42 43 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 41 def topic_exists?(event_type) !!@client.sns_helper.get_topic_arn(event_type) end |
#update_queue(queue) ⇒ Object
49 50 51 |
# File 'lib/eventq/eventq_aws/aws_queue_manager.rb', line 49 def update_queue(queue) @client.sqs_helper.update_queue(queue, queue_attributes(queue)) end |