Class: Roundhouse::Monitor
- Inherits:
-
Object
- Object
- Roundhouse::Monitor
- Defined in:
- lib/roundhouse/monitor.rb
Overview
This class implements two things:
1. A turntable semaphore - the fetcher can pop the next available
exclusive right to something (such as API request with a given
auth token)
2. Track which access right is temporarily suspended
Constant Summary collapse
- ACTIVE =
1- EMPTY =
0- SUSPENDED =
-1
- TURNTABLE =
This helps catch problems with key names at runtime
'turntable'.freeze
- IN_ROTATION =
'inrotation'.freeze
- BUCKETS =
'buckets'.freeze
- QUEUE =
'queue'.freeze
- SCHEDULE =
'schedule'.freeze
- STATUS =
'status'.freeze
- TURNTABLE_TIMEOUT =
Number of seconds to block on turntable
5- WOLVERINE_CONFIG =
class << self
Roundhouse::Script::Configuration.new
- LUA_TURNTABLE_PUSH =
<<END local status = redis.call("HGET", KEYS[1], ARGV[1]) local in_rotation = redis.call("SISMEMBER", KEYS[3], ARGV[1]) if status == "#{ACTIVE}" and in_rotation == 0 then redis.call("SADD", KEYS[3], ARGV[1]) return redis.call("LPUSH", KEYS[2], ARGV[1]) end return nil END
- LUA_MAYBE_TURNTABLE_PUSH =
<<END local status = redis.call("HGET", KEYS[1], ARGV[1]) if status == "#{EMPTY}" or status == nil or status == false then redis.call('SADD', KEYS[3], ARGV[2]) redis.call("HSET", KEYS[1], ARGV[1], #{ACTIVE}) local in_rotation = redis.call("SISMEMBER", KEYS[4], ARGV[1]) if in_rotation == 0 then redis.call('SADD', KEYS[4], ARGV[1]) return redis.call("LPUSH", KEYS[2], ARGV[1]) end end return nil END
- LUA_POP_JOB =
<<END local job = redis.call("RPOP", KEYS[1]) if type(job) == "string" then return job end redis.call("HSET", KEYS[2], ARGV[1], #{EMPTY}) return nil END
- LUA_RESUME_QUEUE =
<<END local status = redis.call("HGET", KEYS[1], ARGV[1]) if status == "#{SUSPENDED}" then redis.call("HSET", KEYS[1], ARGV[1], #{ACTIVE}) local in_rotation = redis.call("SISMEMBER", KEYS[3], ARGV[1]) if in_rotation == 0 then redis.call('SADD', KEYS[4], ARGV[1]) return redis.call("LPUSH", KEYS[2], ARGV[1]) end end return nil END
- TURNTABLE_PUSH =
Roundhouse::Script.new(LUA_TURNTABLE_PUSH.freeze, name: :turntable_push, config: WOLVERINE_CONFIG)
- MAYBE_TURNTABLE_PUSH =
Roundhouse::Script.new(LUA_MAYBE_TURNTABLE_PUSH.freeze, name: :maybe_turntable_push, config: WOLVERINE_CONFIG)
- POP_JOB =
Roundhouse::Script.new(LUA_POP_JOB.freeze, name: :pop_job, config: WOLVERINE_CONFIG)
- RESUME_QUEUE =
Roundhouse::Script.new(LUA_RESUME_QUEUE.freeze, name: :resume_queue, config: WOLVERINE_CONFIG)
Class Method Summary collapse
- .activate(conn, q_id) ⇒ Object
- .bucket_num(q_id) ⇒ Object
-
.maybe_add_to_rotation(conn, q_id) ⇒ Object
Only push onto turntable is status is empty If empty, push into turntable and set to active Subsequent job pushes would see the active queue and not push into turntable Used after pushing a job, and conditionally putting it into rotation.
- .maybe_next_job(conn) ⇒ Object
-
.pop(conn) ⇒ Object
Find the first active queue Return nil if nothing is there.
-
.pop_job(conn, q_id) ⇒ Object
Atomically pop job If nothing is in the queue, set queue status to empty.
-
.push(conn, q_id) ⇒ Object
Atomic push Push queue into turntable if and only if queue status is active.
- .push_job(conn, payloads) ⇒ Object
- .queue_status(conn, q_id) ⇒ Object
-
.rebuild_turntable! ⇒ Object
TODO: This needs to be in a lua script and atomic Need to test how well this scales too.
-
.requeue(conn, q_id, jobs) ⇒ Object
Bulk requeue (push from right).
- .resume(conn, q_id) ⇒ Object
- .set_queue_is_empty(conn, q_id) ⇒ Object
- .status_bucket(q_id) ⇒ Object
- .suspend(conn, q_id) ⇒ Object
Class Method Details
.activate(conn, q_id) ⇒ Object
82 83 84 |
# File 'lib/roundhouse/monitor.rb', line 82 def activate(conn, q_id) set_queue_status(conn, q_id, ACTIVE) end |
.bucket_num(q_id) ⇒ Object
111 112 113 |
# File 'lib/roundhouse/monitor.rb', line 111 def bucket_num(q_id) q_id.to_i / 1000 end |
.maybe_add_to_rotation(conn, q_id) ⇒ Object
Only push onto turntable is status is empty If empty, push into turntable and set to active Subsequent job pushes would see the active queue and not push into turntable Used after pushing a job, and conditionally putting it into rotation
103 104 105 |
# File 'lib/roundhouse/monitor.rb', line 103 def maybe_add_to_rotation(conn, q_id) MAYBE_TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, BUCKETS, IN_ROTATION], [q_id, bucket_num(q_id)] end |
.maybe_next_job(conn) ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/roundhouse/monitor.rb', line 49 def maybe_next_job(conn) queue_id = pop(conn) return nil unless queue_id job = pop_job(conn, queue_id) return queue_id, job if job return nil end |
.pop(conn) ⇒ Object
Find the first active queue Return nil if nothing is there. The fetcher is responsible for polling.
29 30 31 32 33 34 |
# File 'lib/roundhouse/monitor.rb', line 29 def pop(conn) _, q_id = conn.brpop(TURNTABLE, TURNTABLE_TIMEOUT) conn.srem(IN_ROTATION, q_id) return q_id if queue_status(conn, q_id) == ACTIVE return nil end |
.pop_job(conn, q_id) ⇒ Object
Atomically pop job If nothing is in the queue, set queue status to empty
60 61 62 |
# File 'lib/roundhouse/monitor.rb', line 60 def pop_job(conn, q_id) POP_JOB.call conn, ["#{QUEUE}:#{q_id}", status_bucket(q_id)], [q_id] end |
.push(conn, q_id) ⇒ Object
Atomic push Push queue into turntable if and only if queue status is active
38 39 40 41 |
# File 'lib/roundhouse/monitor.rb', line 38 def push(conn, q_id) # NOTE: this version of redis-namespace has a bug when you do keys: argv: params TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id] end |
.push_job(conn, payloads) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/roundhouse/monitor.rb', line 64 def push_job(conn, payloads) return schedule(conn, payloads) if payloads.first['at'] q_id = payloads.first['queue_id'] now = Time.now.to_f to_push = payloads.map do |entry| entry['enqueued_at'.freeze] = now Roundhouse.dump_json(entry) end conn.lpush("#{QUEUE}:#{q_id}", to_push) maybe_add_to_rotation(conn, q_id) end |
.queue_status(conn, q_id) ⇒ Object
94 95 96 |
# File 'lib/roundhouse/monitor.rb', line 94 def queue_status(conn, q_id) conn.hget(status_bucket(q_id), q_id).to_i || EMPTY end |
.rebuild_turntable! ⇒ Object
TODO: This needs to be in a lua script and atomic Need to test how well this scales too. Should be testing about 10,000 queues
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/roundhouse/monitor.rb', line 118 def rebuild_turntable! Roundhouse.redis do |conn| buckets = conn.smembers(BUCKETS) queues = conn.pipelined do buckets.each { |bucket| conn.hgetall("#{STATUS}:#{bucket}") } end all_queue_ids = queues.map(&:keys).flatten queue_len_res = conn.pipelined do all_queue_ids.each { |queue| conn.llen("#{QUEUE}:#{queue}") } end queue_len = all_queue_ids.zip(queue_len_res) status = queues.inject({}) { |a,x| a.merge(x) } conn.multi do conn.del(TURNTABLE) queue_len.each do |(q_id, len)| s = status[q_id].to_i case s when SUSPENDED then next when ACTIVE then if len > 0 conn.lpush(TURNTABLE, q_id) conn.sadd(IN_ROTATION, q_id) else set_queue_status(conn, q_id, EMPTY, false) end when EMPTY then next if len <= 0 conn.lpush(TURNTABLE, q_id) conn.sadd(IN_ROTATION, q_id) set_queue_status(conn, q_id, ACTIVE, false) else set_queue_status(conn, q_id, SUSPENDED, false) end end end end end |
.requeue(conn, q_id, jobs) ⇒ Object
Bulk requeue (push from right). Usually done via Client, when Roundhouse is terminating
45 46 47 |
# File 'lib/roundhouse/monitor.rb', line 45 def requeue(conn, q_id, jobs) conn.rpush("#{QUEUE}:#{q_id}", jobs) end |
.resume(conn, q_id) ⇒ Object
90 91 92 |
# File 'lib/roundhouse/monitor.rb', line 90 def resume(conn, q_id) RESUME_QUEUE.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id] end |
.set_queue_is_empty(conn, q_id) ⇒ Object
78 79 80 |
# File 'lib/roundhouse/monitor.rb', line 78 def set_queue_is_empty(conn, q_id) set_queue_status(conn, q_id, EMPTY) end |
.status_bucket(q_id) ⇒ Object
107 108 109 |
# File 'lib/roundhouse/monitor.rb', line 107 def status_bucket(q_id) "#{STATUS}:#{bucket_num(q_id)}" end |
.suspend(conn, q_id) ⇒ Object
86 87 88 |
# File 'lib/roundhouse/monitor.rb', line 86 def suspend(conn, q_id) set_queue_status(conn, q_id, SUSPENDED) end |