Module: Sidekiq::LimitFetch::Global::Monitor
Constant Summary collapse
- HEARTBEAT_PREFIX =
'limit:heartbeat:'- PROCESS_SET =
'limit:processes'- HEARTBEAT_TTL =
20- REFRESH_TIMEOUT =
5
Instance Method Summary collapse
- #add_dynamic_queues ⇒ Object
- #all_processes ⇒ Object
- #old_processes ⇒ Object
- #remove_old_processes! ⇒ Object
- #start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object
Instance Method Details
#add_dynamic_queues ⇒ Object
37 38 39 40 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 37 def add_dynamic_queues queues = Sidekiq::LimitFetch::Queues queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic? end |
#all_processes ⇒ Object
21 22 23 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 21 def all_processes Sidekiq.redis {|it| it.smembers PROCESS_SET } end |
#old_processes ⇒ Object
25 26 27 28 29 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 25 def old_processes all_processes.reject do |process| Sidekiq.redis {|it| it.get heartbeat_key process } end end |
#remove_old_processes! ⇒ Object
31 32 33 34 35 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 31 def remove_old_processes! Sidekiq.redis do |it| old_processes.each {|process| it.srem PROCESS_SET, process } end end |
#start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 10 def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do add_dynamic_queues update_heartbeat ttl invalidate_old_processes sleep timeout end end end |