Class: ChefEc2NodeRm::SqsPollers
- Inherits:
-
Object
- Object
- ChefEc2NodeRm::SqsPollers
- Includes:
- Logging
- Defined in:
- lib/chef_ec2_node_rm/sqs_pollers.rb
Instance Attribute Summary collapse
-
#pollers ⇒ Object
readonly
Returns the value of attribute pollers.
Instance Method Summary collapse
-
#initialize(urls) ⇒ SqsPollers
constructor
A new instance of SqsPollers.
- #start ⇒ Object
Methods included from Logging
#logger, logger, #logger_device, logger_device
Constructor Details
#initialize(urls) ⇒ SqsPollers
Returns a new instance of SqsPollers.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 7 def initialize(urls) @pollers = urls.map do |url| begin logger.debug(url) { 'Verifying queue' } # Each queue is verified by making a request for its attributes. An # exception should get raised for any queue that is non-existent or # otherwise unavailable. Aws::SQS::Queue.new(url).attributes Aws::SQS::QueuePoller.new(url) rescue Aws::Errors::ServiceError => e logger.error(url) { e. } nil end end.compact end |
Instance Attribute Details
#pollers ⇒ Object (readonly)
Returns the value of attribute pollers.
5 6 7 |
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 5 def pollers @pollers end |
Instance Method Details
#start ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/chef_ec2_node_rm/sqs_pollers.rb', line 23 def start trap('SIGINT') { Thread.new { exit } } @pollers.map do |poller| Thread.new do Thread.current.name = poller.queue_url logger.info(Thread.current.name) { 'Starting poller' } poller.poll(skip_delete: true) do |msg| logger.debug(Thread.current.name) { %(Message received: id='#{msg.}' body='#{msg.body.delete("\n")}') } yield(poller, msg) end end end.each(&:join) logger.info('No queues left to poll') end |