Class: WorkShaper::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/work_shaper/worker.rb

Overview

The worker that runs the stuff

Instance Method Summary collapse

Constructor Details

#initialize(work, on_done, ack_handler, on_error, last_ack, semaphore, max_in_queue) ⇒ Worker

rubocop:disable Metrics/ParameterLists rubocop:disable Layout/LineLength

Parameters:

  • work (Lambda)

    Lambda that we will #call(message) to execute work.

  • on_done (Lambda)

    Lambda that we #call(partition, offset) when work is done.

  • on_error (Lambda)

    Lambda that we #call(exception) if an error is encountered.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/work_shaper/worker.rb', line 9

def initialize(work, on_done, ack_handler, on_error, last_ack, semaphore, max_in_queue)
  @jobs = []
  @work = work
  @on_done = on_done
  @ack_handler = ack_handler
  @on_error = on_error
  @last_ack = last_ack
  @semaphore = semaphore
  @max_in_queue = max_in_queue
  @thread_pool = Concurrent::FixedThreadPool.new(1, auto_terminate: false)
end

Instance Method Details

#enqueue(message, offset_holder) ⇒ Object

rubocop:enable Metrics/ParameterLists rubocop:enable Layout/LineLength



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/work_shaper/worker.rb', line 24

def enqueue(message, offset_holder)
  partition = offset_holder.partition
  offset = offset_holder.offset

  # rubocop:disable Style/RescueStandardError
  @thread_pool.post do
    @work.call(message, partition, offset)
    @on_done.call(message, partition, offset)
  rescue => e
    WorkShaper.logger.error("Error processing #{partition}:#{offset} #{e}")
    WorkShaper.logger.error(e.backtrace.join(" > "))
    @on_error.call(e, message, partition, offset)
  ensure
    @semaphore.synchronize do
      WorkShaper.logger.debug "Completed: #{partition}:#{offset}"
      offset_holder.complete!
    end
  end
  # rubocop:enable Style/RescueStandardError
end

#shutdownObject



45
46
47
48
49
50
51
# File 'lib/work_shaper/worker.rb', line 45

def shutdown
  # Cannot call logger from trap{}
  WorkShaper.logger.info({message: 'Shutting down worker'})
  @thread_pool.shutdown
  @thread_pool.wait_for_termination
  sleep 0.05 while @thread_pool.queue_length.positive?
end