Class: Lowkiq::Schedulers::Lag

Inherits:
Object
  • Object
show all
Defined in:
lib/lowkiq/schedulers/lag.rb

Instance Method Summary collapse

Constructor Details

#initialize(wait, metrics) ⇒ Lag

Returns a new instance of Lag.



4
5
6
7
# File 'lib/lowkiq/schedulers/lag.rb', line 4

def initialize(wait, metrics)
  @metrics = metrics
  @wait = wait
end

Instance Method Details

#build_job(shard_handlers) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/lowkiq/schedulers/lag.rb', line 9

def build_job(shard_handlers)
  Proc.new do
    identifiers = shard_handlers.map { |sh| { queue_name: sh.queue_name, shard: sh.shard_index } }
    metrics = @metrics.call identifiers
    shard_handler, _lag =
                   shard_handlers.zip(metrics.map(&:lag))
                     .select { |(_, lag)| lag > 0 }
                     .max_by { |(_, lag)| lag }

    if shard_handler
      shard_handler.process
    else
      @wait.call
    end
  end
end