Class: EventQ::Amazon::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq_aws/aws_queue_manager.rb

Constant Summary collapse

VISIBILITY_TIMEOUT =
'VisibilityTimeout'.freeze
MESSAGE_RETENTION_PERIOD =
'MessageRetentionPeriod'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ QueueManager

Returns a new instance of QueueManager.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/eventq_aws/aws_queue_manager.rb', line 8

def initialize(options)

  if options[:client] == nil
    raise ':client (QueueClient) must be specified.'.freeze
  end

  @client = options[:client]

  @visibility_timeout = 300 #5 minutes
  if options.key?(:visibility_timeout)
    @visibility_timeout = options[:visibility_timeout]
  end

  @message_retention_period = 1209600 #14 days (max aws value)
  if options.key?(:message_retention_period)
    @message_retention_period = options[:message_retention_period]
  end

end

Instance Method Details

#create_queue(queue) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/eventq_aws/aws_queue_manager.rb', line 38

def create_queue(queue)
  _queue_name = EventQ.create_queue_name(queue.name)
  response = @client.sqs.create_queue({
                                          queue_name: _queue_name,
                                          attributes: {
                                              VISIBILITY_TIMEOUT => @visibility_timeout.to_s,
                                              MESSAGE_RETENTION_PERIOD => @message_retention_period.to_s
                                          }
                                      })

  return response.queue_url
end

#drop_queue(queue) ⇒ Object



51
52
53
54
55
56
57
58
59
# File 'lib/eventq_aws/aws_queue_manager.rb', line 51

def drop_queue(queue)

  q = get_queue(queue)

  @client.sqs.delete_queue({ queue_url: q})

  return true

end

#drop_topic(event_type) ⇒ Object



61
62
63
64
65
66
# File 'lib/eventq_aws/aws_queue_manager.rb', line 61

def drop_topic(event_type)
  topic_arn = @client.get_topic_arn(event_type)
  @client.sns.delete_topic({ topic_arn: topic_arn})

  return true
end

#get_queue(queue) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/eventq_aws/aws_queue_manager.rb', line 28

def get_queue(queue)

  if queue_exists?(queue)
    update_queue(queue)
  else
    create_queue(queue)
  end

end

#queue_exists?(queue) ⇒ Boolean

Returns:

  • (Boolean)


79
80
81
82
# File 'lib/eventq_aws/aws_queue_manager.rb', line 79

def queue_exists?(queue)
  _queue_name = EventQ.create_queue_name(queue.name)
  return @client.sqs.list_queues({ queue_name_prefix: _queue_name }).queue_urls.length > 0
end

#topic_exists?(event_type) ⇒ Boolean

Returns:

  • (Boolean)


68
69
70
71
72
73
74
75
76
77
# File 'lib/eventq_aws/aws_queue_manager.rb', line 68

def topic_exists?(event_type)
  topic_arn = @client.get_topic_arn(event_type)

  begin
  @client.sns.get_topic_attributes({ topic_arn: topic_arn })
  rescue
    return false
  end
  return true
end

#update_queue(queue) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/eventq_aws/aws_queue_manager.rb', line 84

def update_queue(queue)
  url = @client.get_queue_url(queue)
  @client.sqs.set_queue_attributes({
                                       queue_url: url, # required
                                        attributes: {
                                            VISIBILITY_TIMEOUT => @visibility_timeout.to_s,
                                            MESSAGE_RETENTION_PERIOD => @message_retention_period.to_s
                                        }
                                    })
  return url
end