Class: Sidekiq::HeartbeatMonitor::Scheduler

Inherits:
Object
  • Object
show all
Includes:
Util, Worker
Defined in:
lib/sidekiq/heartbeat_monitor/scheduler.rb

Constant Summary collapse

TIME_BETWEEN_HEARTBEATS =
60

Instance Method Summary collapse

Methods included from Util

#format_time_str

Instance Method Details

#check_queue_size!(q, queue_config) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/sidekiq/heartbeat_monitor/scheduler.rb', line 43

def check_queue_size!(q, queue_config)
  max_queue_size = queue_config.max_queue_size

  if q.size > max_queue_size
   queue_config.send_backed_up_alert!("⚠️ _#{q.name}_ queue has more than #{max_queue_size} jobs waiting to be processed. Current size is #{q.size}", q)
  end
end

#performObject

Checks to see if queues are backed up by 1000 or more jobs and also schedules the heartbeat job.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/sidekiq/heartbeat_monitor/scheduler.rb', line 12

def perform
  Sidekiq.redis do |redis|
    Sidekiq::Queue.all.each do |q|
      queue_config = Sidekiq::HeartbeatMonitor.config(q)
      check_queue_size!(q, queue_config)

      key = "Sidekiq:HeartbeatMonitor:Worker-#{q.name}.enqueued_at"

      last_enqueued_at = redis.get(key).to_i

      if last_enqueued_at > 577997505 # Enqueued after Jan 2, 2020 when this code was written
        time_since_enqueued = Time.now.to_i - last_enqueued_at
        if (time_since_enqueued - TIME_BETWEEN_HEARTBEATS) > queue_config.max_heartbeat_delay
          queue_config.send_slowed_down_alert!("⚠️ _#{q.name}_ queue is taking longer than #{format_time_str(time_since_enqueued)} to reach jobs.", q)
        else
          next
        end
      end

      redis.set(key, Time.now.to_i, ex: 1.week)

      Sidekiq::HeartbeatMonitor::Worker.client_push(
        'class' => Sidekiq::HeartbeatMonitor::Worker, 
        'args'  => [q.name], 
        'queue' => q.name
      )
    end

  end
end