Class: WorkShaper::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/work_shaper/manager.rb

Overview

The Manager is responsible for organizing the work to be done, triggering calls to acknowledge work done for each offset in monotonically increasing order (independent of the execution order), and gracefully cleaning up when ‘#shutdown` is called.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(work:, on_done:, ack:, on_error:, max_in_queue: 3, heartbeat_period_sec: 60, offset_commit_period_ms: 5) ⇒ Manager

Several of the parameters here are Lambdas (not Proc). Note you can pass a method using ‘method(:some_method)` or a lambda directly `->{ puts ’Hello’}‘.

Parameters:

  • work (#call(message, partition, offset))

    Lambda that we will call to execute work.

  • on_done (#call(message, partition, offset))

    Lambda that we call when work is done.

  • ack

    Lambda we will call when it is safe to commit an offset. This is not the

    same as Done.

  • on_error (#call(exception, message, partition, offset))

    Lambda that we call if an error is encountered.

  • max_in_queue (Integer) (defaults to: 3)

    The maximum in flight jobs per Sub Key. This affects how many message could get replayed if your process crashes before the offsets are committed.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/work_shaper/manager.rb', line 19

def initialize(work:, on_done:, ack:, on_error:, max_in_queue: 3,
               heartbeat_period_sec: 60, offset_commit_period_ms: 5)
  @work = work
  @on_done = on_done
  @ack = ack
  @on_error = on_error
  @workers = {}
  @last_ack = {}
  @received_offsets = {}
  @max_in_queue = max_in_queue
  @semaphore = Mutex.new
  @shutting_down = false

  @total_enqueued = 0
  @total_acked = 0

  @heartbeat = Thread.new do
    while true
      report(detailed: true)
      sleep heartbeat_period_sec
    end
  rescue => e
    WorkShaper.logger.warn({ message: 'Shutdown from Heartbeat', error: e })
    shutdown
  end

  @offset_manager = Thread.new do
    while true
      @received_offsets.each_key do |partition|
        offset_ack(partition)
      end
      sleep offset_commit_period_ms / 1000.0
    end
  rescue => e
    WorkShaper.logger.warn({ message: 'Shutdown from Offset Manager', error: e })
    shutdown
  end
end

Instance Attribute Details

#shutting_downObject (readonly)

Returns the value of attribute shutting_down.



6
7
8
# File 'lib/work_shaper/manager.rb', line 6

def shutting_down
  @shutting_down
end

#total_ackedObject (readonly)

Returns the value of attribute total_acked.



6
7
8
# File 'lib/work_shaper/manager.rb', line 6

def total_acked
  @total_acked
end

#total_enqueuedObject (readonly)

Returns the value of attribute total_enqueued.



6
7
8
# File 'lib/work_shaper/manager.rb', line 6

def total_enqueued
  @total_enqueued
end

Instance Method Details

#enqueue(sub_key, message, partition, offset) ⇒ Object

Enqueue a message to be worked on the given ‘sub_key`, `partition`, and `offset`.

Raises:

  • (StandardError)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/work_shaper/manager.rb', line 59

def enqueue(sub_key, message, partition, offset)
  raise StandardError, 'Shutting down' if @shutting_down
  pause_on_overrun

  offset_holder = OffsetHolder.new(partition, offset)
  WorkShaper.logger.debug "Enqueue: #{sub_key}/#{offset_holder}"

  worker = nil
  @semaphore.synchronize do
    @total_enqueued += 1
    (@received_offsets[partition] ||= Array.new) << offset_holder

    worker =
      @workers[sub_key] ||=
        Worker.new(
          @work,
          @on_done,
          method(:offset_ack),
          @on_error,
          @last_ack,
          @semaphore,
          @max_in_queue
        )
  end

  worker.enqueue(message, offset_holder)
end

#flush(safe: true) ⇒ Object

Flush any offsets for which work has been completed. Only lowest continuous run of offsets will be acknowledged. Any offset after a discontinuity will be replayed when the consumer restarts.



90
91
92
93
94
95
# File 'lib/work_shaper/manager.rb', line 90

def flush(safe: true)
  sleep 5
  @received_offsets.each_key do |k|
    safe ? offset_ack(k) : offset_ack_unsafe(k)
  end
end

#report(detailed: false) ⇒ Object

Output state of Last Acked and Pending Offset Ack’s.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/work_shaper/manager.rb', line 98

def report(detailed: false)
  @semaphore.synchronize do
    WorkShaper.logger.info(
      { message: 'Reporting', total_enqueued: @total_enqueued,
        total_acked: @total_acked,
        in_flight: (@total_enqueued.to_i - @total_acked.to_i),
        last_acked_offsets: @last_ack,
        worker_count: @workers.keys.count,
        offset_mgr: @offset_manager.status
      })
    if detailed
      WorkShaper.logger.info(
        {
          message: 'Reporting - Extra Detail',
          pending_ack: @completed_offsets,
          received_offsets: @received_offsets
        })
    end
  end
end

#shutdownObject

Stop the underlying threads



120
121
122
123
124
125
126
127
# File 'lib/work_shaper/manager.rb', line 120

def shutdown
  @shutting_down = true
  WorkShaper.logger.warn({ message: 'Shutting Down' })
  Thread.kill(@heartbeat)
  Thread.kill(@offset_manager)
  report(detailed: true)
  @workers.each_value(&:shutdown)
end