Class: EventQ::Amazon::QueueWorkerV2
- Inherits:
-
Object
- Object
- EventQ::Amazon::QueueWorkerV2
- Includes:
- WorkerId
- Defined in:
- lib/eventq_aws/aws_queue_worker_v2.rb
Constant Summary collapse
- APPROXIMATE_RECEIVE_COUNT =
'ApproximateReceiveCount'.freeze
- MESSAGE =
'Message'.freeze
Instance Attribute Summary collapse
-
#is_running ⇒ Object
Returns the value of attribute is_running.
Instance Method Summary collapse
- #call_on_error_block(error:, message: nil) ⇒ Object
- #call_on_retry_block(message) ⇒ Object
- #call_on_retry_exceeded_block(message) ⇒ Object
- #deserialize_message(payload) ⇒ Object
-
#initialize ⇒ QueueWorkerV2
constructor
A new instance of QueueWorkerV2.
- #on_error(&block) ⇒ Object
- #on_retry(&block) ⇒ Object
- #on_retry_exceeded(&block) ⇒ Object
- #running? ⇒ Boolean
- #serialize_message(msg) ⇒ Object
- #start(queue, options = {}, &block) ⇒ Object
- #start_process(options, queue, block) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ QueueWorkerV2
Returns a new instance of QueueWorkerV2.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 13 def initialize @forks = [] @is_running = false @on_retry_exceeded_block = nil @on_retry_block = nil @on_error_block = nil @hash_helper = HashKit::Helper.new @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new @queue_poll_wait = 10 end |
Instance Attribute Details
#is_running ⇒ Object
Returns the value of attribute is_running.
11 12 13 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 11 def is_running @is_running end |
Instance Method Details
#call_on_error_block(error:, message: nil) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 106 def call_on_error_block(error:, message: nil) if @on_error_block EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } begin @on_error_block.call(error, ) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } end end |
#call_on_retry_block(message) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 132 def call_on_retry_block() if @on_retry_block EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } begin @on_retry_block.call(, abort) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } end end |
#call_on_retry_exceeded_block(message) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 119 def call_on_retry_exceeded_block() if @on_retry_exceeded_block != nil EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } begin @on_retry_exceeded_block.call() rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } end end |
#deserialize_message(payload) ⇒ Object
169 170 171 172 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 169 def (payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end |
#on_error(&block) ⇒ Object
160 161 162 163 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 160 def on_error(&block) @on_error_block = block return nil end |
#on_retry(&block) ⇒ Object
155 156 157 158 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 155 def on_retry(&block) @on_retry_block = block return nil end |
#on_retry_exceeded(&block) ⇒ Object
151 152 153 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 151 def on_retry_exceeded(&block) @retry_exceeded_block = block end |
#running? ⇒ Boolean
165 166 167 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 165 def running? return @is_running end |
#serialize_message(msg) ⇒ Object
174 175 176 177 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 174 def (msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end |
#start(queue, options = {}, &block) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 28 def start(queue, = {}, &block) EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") configure(queue, ) if [:client] == nil raise "[#{self.class}] - :client (QueueClient) must be specified." end raise "[#{self.class}] - Worker is already running." if running? client = [:client] EventQ.logger.debug do "[#{self.class} #start] - Listening for messages on queue: #{queue.name}, Queue Url: #{client.get_queue_url(queue)}, Queue arn: #{client.get_queue_arn(queue)}" end EventQ.logger.info("[#{self.class}] - Listening for messages.") @forks = [] if @fork_count > 1 Thread.new do @fork_count.times do pid = fork do start_process(, queue, block) end @forks.push(pid) end @forks.each { |pid| Process.wait(pid) } end else start_process(, queue, block) end return true end |
#start_process(options, queue, block) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 66 def start_process(, queue, block) %w'INT TERM'.each do |sig| Signal.trap(sig) { stop exit } end @is_running = true Thread.new do client = [:client] manager = EventQ::Amazon::QueueManager.new({ client: client }) queue_url = manager.get_queue(queue) poller = Aws::SQS::QueuePoller.new(queue_url, attribute_names: [APPROXIMATE_RECEIVE_COUNT]) poller.poll(skip_delete: true) do |msg, stats| begin tag_processing_thread (msg, poller, queue, block) rescue => e EventQ.logger.error do "[#{self.class}] - An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}" end call_on_error_block(error: e) ensure untag_processing_thread end end end if (.key?(:wait) && [:wait] == true) || (.key?(:fork_count) && [:fork_count] > 1) while running? do sleep 5 end end end |
#stop ⇒ Object
145 146 147 148 149 |
# File 'lib/eventq_aws/aws_queue_worker_v2.rb', line 145 def stop EventQ.logger.info("[#{self.class}] - Stopping.") @is_running = false return true end |