Class: QueueBus::Heartbeat
- Inherits:
-
Object
- Object
- QueueBus::Heartbeat
- Defined in:
- lib/queue_bus/heartbeat.rb
Overview
When run, will calculate all of the heartbeats that need to be sent and then broadcasts those events out for execution. By always backfilling it ensures that no heartbeat is ever missed.
Class Method Summary collapse
- .environment_name ⇒ Object
- .get_saved_minute! ⇒ Object
- .lock! ⇒ Object
- .lock_key ⇒ Object
- .lock_seconds ⇒ Object
- .perform(*_args) ⇒ Object
- .redis_key ⇒ Object
- .set_saved_minute!(epoch_minute) ⇒ Object
- .unlock! ⇒ Object
Class Method Details
.environment_name ⇒ Object
47 48 49 |
# File 'lib/queue_bus/heartbeat.rb', line 47 def environment_name ENV['RAILS_ENV'] || ENV['RACK_ENV'] || ENV['BUS_ENV'] end |
.get_saved_minute! ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/queue_bus/heartbeat.rb', line 51 def get_saved_minute! key = ::QueueBus.redis { |redis| redis.get(redis_key) } return nil if key.nil? case environment_name when 'development', 'test' # only 3 minutes in development; otherwise, TONS of events if not run in a while three_ago = Time.now.to_i / 60 - 3 key = three_ago if key.to_i < three_ago end key.to_i end |
.lock! ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/queue_bus/heartbeat.rb', line 17 def lock! now = Time.now.to_i timeout = now + lock_seconds + 2 ::QueueBus.redis do |redis| # return true if we successfully acquired the lock return timeout if redis.setnx(lock_key, timeout) # see if the existing timeout is still valid and return false if it is # (we cannot acquire the lock during the timeout period) return 0 if now <= redis.get(lock_key).to_i # otherwise set the timeout and ensure that no other worker has # acquired the lock if now > redis.getset(lock_key, timeout).to_i return timeout else return 0 end end end |
.lock_key ⇒ Object
9 10 11 |
# File 'lib/queue_bus/heartbeat.rb', line 9 def lock_key 'bus:heartbeat:lock' end |
.lock_seconds ⇒ Object
13 14 15 |
# File 'lib/queue_bus/heartbeat.rb', line 13 def lock_seconds 60 end |
.perform(*_args) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/queue_bus/heartbeat.rb', line 68 def perform(*_args) real_now = Time.now.to_i run_until = lock! - 2 return if run_until < real_now while (real_now = Time.now.to_i) < run_until minutes = real_now.to_i / 60 last = get_saved_minute! if last break if minutes <= last minutes = last + 1 end seconds = minutes * 60 hours = minutes / 60 days = minutes / (60 * 24) now = Time.at(seconds) attributes = {} attributes['epoch_seconds'] = seconds attributes['epoch_minutes'] = minutes attributes['epoch_hours'] = hours attributes['epoch_days'] = days attributes['minute'] = now.min attributes['hour'] = now.hour attributes['day'] = now.day attributes['month'] = now.month attributes['year'] = now.year attributes['yday'] = now.yday attributes['wday'] = now.wday ::QueueBus.publish('heartbeat_minutes', attributes) set_saved_minute!(minutes) end unlock! end |
.redis_key ⇒ Object
43 44 45 |
# File 'lib/queue_bus/heartbeat.rb', line 43 def redis_key 'bus:heartbeat:timestamp' end |
.set_saved_minute!(epoch_minute) ⇒ Object
64 65 66 |
# File 'lib/queue_bus/heartbeat.rb', line 64 def set_saved_minute!(epoch_minute) ::QueueBus.redis { |redis| redis.set(redis_key, epoch_minute) } end |
.unlock! ⇒ Object
39 40 41 |
# File 'lib/queue_bus/heartbeat.rb', line 39 def unlock! ::QueueBus.redis { |redis| redis.del(lock_key) } end |