Class: WorkShaper::Manager
- Inherits:
-
Object
- Object
- WorkShaper::Manager
- Defined in:
- lib/work_shaper/manager.rb
Overview
The Manager is responsible for organizing the work to be done, triggering calls to acknowledge work done for each offset in monotonically increasing order (independent of the execution order), and gracefully cleaning up when ‘#shutdown` is called.
Instance Attribute Summary collapse
-
#shutting_down ⇒ Object
readonly
Returns the value of attribute shutting_down.
-
#total_acked ⇒ Object
readonly
Returns the value of attribute total_acked.
-
#total_enqueued ⇒ Object
readonly
Returns the value of attribute total_enqueued.
Instance Method Summary collapse
-
#enqueue(sub_key, message, partition, offset) ⇒ Object
Enqueue a message to be worked on the given ‘sub_key`, `partition`, and `offset`.
-
#flush(safe: true) ⇒ Object
Flush any offsets for which work has been completed.
-
#initialize(work:, on_done:, ack:, on_error:, max_in_queue: 3, heartbeat_period_sec: 60, offset_commit_period_ms: 5) ⇒ Manager
constructor
Several of the parameters here are Lambdas (not Proc).
-
#report(detailed: false) ⇒ Object
Output state of Last Acked and Pending Offset Ack’s.
-
#shutdown ⇒ Object
Stop the underlying threads.
Constructor Details
#initialize(work:, on_done:, ack:, on_error:, max_in_queue: 3, heartbeat_period_sec: 60, offset_commit_period_ms: 5) ⇒ Manager
Several of the parameters here are Lambdas (not Proc). Note you can pass a method using ‘method(:some_method)` or a lambda directly `->{ puts ’Hello’}‘.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/work_shaper/manager.rb', line 19 def initialize(work:, on_done:, ack:, on_error:, max_in_queue: 3, heartbeat_period_sec: 60, offset_commit_period_ms: 5) @work = work @on_done = on_done @ack = ack @on_error = on_error @workers = {} @last_ack = {} @received_offsets = {} @max_in_queue = max_in_queue @semaphore = Mutex.new @shutting_down = false @total_enqueued = 0 @total_acked = 0 @heartbeat = Thread.new do while true report(detailed: true) sleep heartbeat_period_sec end rescue => e WorkShaper.logger.warn({ message: 'Shutdown from Heartbeat', error: e }) shutdown end @offset_manager = Thread.new do while true @received_offsets.each_key do |partition| offset_ack(partition) end sleep offset_commit_period_ms / 1000.0 end rescue => e WorkShaper.logger.warn({ message: 'Shutdown from Offset Manager', error: e }) shutdown end end |
Instance Attribute Details
#shutting_down ⇒ Object (readonly)
Returns the value of attribute shutting_down.
6 7 8 |
# File 'lib/work_shaper/manager.rb', line 6 def shutting_down @shutting_down end |
#total_acked ⇒ Object (readonly)
Returns the value of attribute total_acked.
6 7 8 |
# File 'lib/work_shaper/manager.rb', line 6 def total_acked @total_acked end |
#total_enqueued ⇒ Object (readonly)
Returns the value of attribute total_enqueued.
6 7 8 |
# File 'lib/work_shaper/manager.rb', line 6 def total_enqueued @total_enqueued end |
Instance Method Details
#enqueue(sub_key, message, partition, offset) ⇒ Object
Enqueue a message to be worked on the given ‘sub_key`, `partition`, and `offset`.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/work_shaper/manager.rb', line 59 def enqueue(sub_key, , partition, offset) raise StandardError, 'Shutting down' if @shutting_down pause_on_overrun offset_holder = OffsetHolder.new(partition, offset) WorkShaper.logger.debug "Enqueue: #{sub_key}/#{offset_holder}" worker = nil @semaphore.synchronize do @total_enqueued += 1 (@received_offsets[partition] ||= Array.new) << offset_holder worker = @workers[sub_key] ||= Worker.new( @work, @on_done, method(:offset_ack), @on_error, @last_ack, @semaphore, @max_in_queue ) end worker.enqueue(, offset_holder) end |
#flush(safe: true) ⇒ Object
Flush any offsets for which work has been completed. Only lowest continuous run of offsets will be acknowledged. Any offset after a discontinuity will be replayed when the consumer restarts.
90 91 92 93 94 95 |
# File 'lib/work_shaper/manager.rb', line 90 def flush(safe: true) sleep 5 @received_offsets.each_key do |k| safe ? offset_ack(k) : offset_ack_unsafe(k) end end |
#report(detailed: false) ⇒ Object
Output state of Last Acked and Pending Offset Ack’s.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/work_shaper/manager.rb', line 98 def report(detailed: false) @semaphore.synchronize do WorkShaper.logger.info( { message: 'Reporting', total_enqueued: @total_enqueued, total_acked: @total_acked, in_flight: (@total_enqueued.to_i - @total_acked.to_i), last_acked_offsets: @last_ack, worker_count: @workers.keys.count, offset_mgr: @offset_manager.status }) if detailed WorkShaper.logger.info( { message: 'Reporting - Extra Detail', pending_ack: @completed_offsets, received_offsets: @received_offsets }) end end end |
#shutdown ⇒ Object
Stop the underlying threads
120 121 122 123 124 125 126 127 |
# File 'lib/work_shaper/manager.rb', line 120 def shutdown @shutting_down = true WorkShaper.logger.warn({ message: 'Shutting Down' }) Thread.kill(@heartbeat) Thread.kill(@offset_manager) report(detailed: true) @workers.each_value(&:shutdown) end |