Class: EventQ::Amazon::QueueWorker
- Inherits:
- 
      Object
      
        - Object
- EventQ::Amazon::QueueWorker
 
- Includes:
- WorkerId
- Defined in:
- lib/eventq/eventq_aws/aws_queue_worker.rb
Constant Summary collapse
- APPROXIMATE_RECEIVE_COUNT =
- 'ApproximateReceiveCount'
- MESSAGE =
- 'Message'
- AWS_MAX_VISIBILITY_TIMEOUT =
          12h 
- 43_200
Instance Attribute Summary collapse
- 
  
    
      #context  ⇒ Object 
    
    
  
  
  
  
    
    
  
  
  
  
  
  
    Returns the value of attribute context. 
Instance Method Summary collapse
- 
  
    
      #acknowledge_message(poller, msg)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Logic for the RabbitMq adapter when a message is accepted. 
- #configure(options = {}) ⇒ Object
- #deserialize_message(payload) ⇒ Object
- 
  
    
      #initialize  ⇒ QueueWorker 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of QueueWorker. 
- #pre_process(context, options) ⇒ Object
- #serialize_message(msg) ⇒ Object
- #thread_process_iteration(queue, options, block) ⇒ Object
Methods included from WorkerId
#tag_processing_thread, #untag_processing_thread
Constructor Details
#initialize ⇒ QueueWorker
Returns a new instance of QueueWorker.
| 16 17 18 19 20 21 22 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 16 def initialize @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new @calculate_visibility_timeout = Amazon::CalculateVisibilityTimeout.new( max_timeout: AWS_MAX_VISIBILITY_TIMEOUT ) end | 
Instance Attribute Details
#context ⇒ Object
Returns the value of attribute context.
| 14 15 16 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 14 def context @context end | 
Instance Method Details
#acknowledge_message(poller, msg) ⇒ Object
Logic for the RabbitMq adapter when a message is accepted
| 76 77 78 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 76 def (poller, msg) poller.(msg) end | 
#configure(options = {}) ⇒ Object
| 69 70 71 72 73 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 69 def configure( = {}) [:queue_poll_wait] ||= 10 EventQ.logger.info("[#{self.class}] - Configuring. Queue Poll Wait: #{[:queue_poll_wait]}") end | 
#deserialize_message(payload) ⇒ Object
| 59 60 61 62 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 59 def (payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.deserialize(payload) end | 
#pre_process(context, options) ⇒ Object
| 24 25 26 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 24 def pre_process(context, ) # don't do anything specific to set up the process before threads are fired off. end | 
#serialize_message(msg) ⇒ Object
| 64 65 66 67 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 64 def (msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.serialize(msg) end | 
#thread_process_iteration(queue, options, block) ⇒ Object
| 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # File 'lib/eventq/eventq_aws/aws_queue_worker.rb', line 28 def thread_process_iteration(queue, , block) client = [:client] manager = [:manager] || EventQ::Amazon::QueueManager.new({ client: client }) # get the queue queue_url = manager.get_queue(queue) poller = Aws::SQS::QueuePoller.new(queue_url, attribute_names: [APPROXIMATE_RECEIVE_COUNT]) # Polling will block indefinitely unless we force it to stop poller.before_request do |stats| unless context.running? EventQ.logger.info("AWS Poller shutting down") throw :stop_polling end end poller.poll(skip_delete: true, wait_time_seconds: [:queue_poll_wait]) do |msg, stats| begin tag_processing_thread (msg, poller, queue, block) rescue => e EventQ.logger.error do "[#{self.class}] - An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}" end context.call_on_error_block(error: e) ensure untag_processing_thread end end end |