Class: WorkShaper::Worker
- Inherits:
-
Object
- Object
- WorkShaper::Worker
- Defined in:
- lib/work_shaper/worker.rb
Overview
The worker that runs the stuff
Instance Method Summary collapse
-
#enqueue(message, offset_holder) ⇒ Object
rubocop:enable Metrics/ParameterLists rubocop:enable Layout/LineLength.
-
#initialize(work, on_done, ack_handler, on_error, last_ack, semaphore, max_in_queue) ⇒ Worker
constructor
rubocop:disable Metrics/ParameterLists rubocop:disable Layout/LineLength.
- #shutdown ⇒ Object
Constructor Details
#initialize(work, on_done, ack_handler, on_error, last_ack, semaphore, max_in_queue) ⇒ Worker
rubocop:disable Metrics/ParameterLists rubocop:disable Layout/LineLength
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/work_shaper/worker.rb', line 9 def initialize(work, on_done, ack_handler, on_error, last_ack, semaphore, max_in_queue) @jobs = [] @work = work @on_done = on_done @ack_handler = ack_handler @on_error = on_error @last_ack = last_ack @semaphore = semaphore @max_in_queue = max_in_queue @thread_pool = Concurrent::FixedThreadPool.new(1, auto_terminate: false) end |
Instance Method Details
#enqueue(message, offset_holder) ⇒ Object
rubocop:enable Metrics/ParameterLists rubocop:enable Layout/LineLength
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/work_shaper/worker.rb', line 24 def enqueue(, offset_holder) partition = offset_holder.partition offset = offset_holder.offset # rubocop:disable Style/RescueStandardError @thread_pool.post do @work.call(, partition, offset) @on_done.call(, partition, offset) rescue => e WorkShaper.logger.error("Error processing #{partition}:#{offset} #{e}") WorkShaper.logger.error(e.backtrace.join(" > ")) @on_error.call(e, , partition, offset) ensure @semaphore.synchronize do WorkShaper.logger.debug "Completed: #{partition}:#{offset}" offset_holder.complete! end end # rubocop:enable Style/RescueStandardError end |
#shutdown ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/work_shaper/worker.rb', line 45 def shutdown # Cannot call logger from trap{} WorkShaper.logger.info({message: 'Shutting down worker'}) @thread_pool.shutdown @thread_pool.wait_for_termination sleep 0.05 while @thread_pool.queue_length.positive? end |