Class: ChefEc2NodeRm::SqsPollers

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/chef_ec2_node_rm/sqs_pollers.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, #logger_device, logger_device

Constructor Details

#initialize(urls) ⇒ SqsPollers

Returns a new instance of SqsPollers.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 7

def initialize(urls)
  @pollers = urls.map do |url|
    begin
      logger.debug(url) { 'Verifying queue' }
      # Each queue is verified by making a request for its attributes. An
      # exception should get raised for any queue that is non-existent or
      # otherwise unavailable.
      Aws::SQS::Queue.new(url).attributes
      Aws::SQS::QueuePoller.new(url)
    rescue Aws::Errors::ServiceError => e
      logger.error(url) { e.message }
      nil
    end
  end.compact
end

Instance Attribute Details

#pollersObject (readonly)

Returns the value of attribute pollers.



5
6
7
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 5

def pollers
  @pollers
end

Instance Method Details

#startObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 23

def start
  trap('SIGINT') { Thread.new { exit } }
  @pollers.map do |poller|
    Thread.new do
      Thread.current.name = poller.queue_url
      logger.info(Thread.current.name) { 'Starting poller' }
      poller.poll(skip_delete: true) do |msg|
        logger.debug(Thread.current.name) { %(Message received: id='#{msg.message_id}' body='#{msg.body.delete("\n")}') }
        yield(poller, msg)
      end
    end
  end.each(&:join)
  logger.info('No queues left to poll')
end