Module: Sidekiq::LimitFetch::Global::Monitor

Extended by:
Monitor
Included in:
Monitor
Defined in:
lib/sidekiq/limit_fetch/global/monitor.rb

Constant Summary collapse

HEARTBEAT_PREFIX =
'limit:heartbeat:'
PROCESS_SET =
'limit:processes'
HEARTBEAT_TTL =
20
REFRESH_TIMEOUT =
5

Instance Method Summary collapse

Instance Method Details

#add_dynamic_queuesObject



37
38
39
40
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 37

def add_dynamic_queues
  queues = Sidekiq::LimitFetch::Queues
  queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic?
end

#all_processesObject



21
22
23
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 21

def all_processes
  Sidekiq.redis {|it| it.smembers PROCESS_SET }
end

#old_processesObject



25
26
27
28
29
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 25

def old_processes
  all_processes.reject do |process|
    Sidekiq.redis {|it| it.get heartbeat_key process }
  end
end

#remove_old_processes!Object



31
32
33
34
35
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 31

def remove_old_processes!
  Sidekiq.redis do |it|
    old_processes.each {|process| it.srem PROCESS_SET, process }
  end
end

#start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 10

def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
  Thread.new do
    loop do
      add_dynamic_queues
      update_heartbeat ttl
      invalidate_old_processes
      sleep timeout
    end
  end
end