Class: QueueBus::Heartbeat
- Inherits:
-
Object
- Object
- QueueBus::Heartbeat
- Defined in:
- lib/queue_bus/heartbeat.rb
Overview
publishes event about the current time
Class Method Summary collapse
- .environment_name ⇒ Object
- .get_saved_minute! ⇒ Object
- .lock! ⇒ Object
- .lock_key ⇒ Object
- .lock_seconds ⇒ Object
- .perform(*args) ⇒ Object
- .redis_key ⇒ Object
- .set_saved_minute!(epoch_minute) ⇒ Object
- .unlock! ⇒ Object
Class Method Details
.environment_name ⇒ Object
46 47 48 |
# File 'lib/queue_bus/heartbeat.rb', line 46 def environment_name ENV["RAILS_ENV"] || ENV["RACK_ENV"] || ENV["BUS_ENV"] end |
.get_saved_minute! ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/queue_bus/heartbeat.rb', line 50 def get_saved_minute! key = ::QueueBus.redis { |redis| redis.get(redis_key) } return nil if key.nil? case environment_name when 'development', 'test' # only 3 minutes in development; otherwise, TONS of events if not run in a while three_ago = Time.now.to_i - 3*60*60 key = three_ago if key.to_i < three_ago end return key.to_i end |
.lock! ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/queue_bus/heartbeat.rb', line 15 def lock! now = Time.now.to_i timeout = now + lock_seconds + 2 ::QueueBus.redis do |redis| # return true if we successfully acquired the lock return timeout if redis.setnx(lock_key, timeout) # see if the existing timeout is still valid and return false if it is # (we cannot acquire the lock during the timeout period) return 0 if now <= redis.get(lock_key).to_i # otherwise set the timeout and ensure that no other worker has # acquired the lock if now > redis.getset(lock_key, timeout).to_i return timeout else return 0 end end end |
.lock_key ⇒ Object
7 8 9 |
# File 'lib/queue_bus/heartbeat.rb', line 7 def lock_key "bus:heartbeat:lock" end |
.lock_seconds ⇒ Object
11 12 13 |
# File 'lib/queue_bus/heartbeat.rb', line 11 def lock_seconds 60 end |
.perform(*args) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/queue_bus/heartbeat.rb', line 66 def perform(*args) real_now = Time.now.to_i run_until = lock! - 2 return if run_until < real_now while((real_now = Time.now.to_i) < run_until) minutes = real_now.to_i / 60 last = get_saved_minute! if last break if minutes <= last minutes = last + 1 end seconds = minutes * (60) hours = minutes / (60) days = minutes / (60*24) now = Time.at(seconds) attributes = {} attributes["epoch_seconds"] = seconds attributes["epoch_minutes"] = minutes attributes["epoch_hours"] = hours attributes["epoch_days"] = days attributes["minute"] = now.min attributes["hour"] = now.hour attributes["day"] = now.day attributes["month"] = now.month attributes["year"] = now.year attributes["yday"] = now.yday attributes["wday"] = now.wday ::QueueBus.publish("heartbeat_minutes", attributes) set_saved_minute!(minutes) end unlock! end |
.redis_key ⇒ Object
42 43 44 |
# File 'lib/queue_bus/heartbeat.rb', line 42 def redis_key "bus:heartbeat:timestamp" end |
.set_saved_minute!(epoch_minute) ⇒ Object
62 63 64 |
# File 'lib/queue_bus/heartbeat.rb', line 62 def set_saved_minute!(epoch_minute) ::QueueBus.redis { |redis| redis.set(redis_key, epoch_minute) } end |
.unlock! ⇒ Object
37 38 39 |
# File 'lib/queue_bus/heartbeat.rb', line 37 def unlock! ::QueueBus.redis { |redis| redis.del(lock_key) } end |