Class: Roundhouse::Monitor
- Inherits:
-
Object
- Object
- Roundhouse::Monitor
- Defined in:
- lib/roundhouse/monitor.rb
Overview
This class implements two things:
1. A queueing semaphore - the fetcher can pop the next available
exclusive right to something (such as API request with a given
auth token)
2. Track which access right is temporarily suspended
Constant Summary collapse
- ACTIVE =
1- EMPTY =
0- SUSPENDED =
-1
- SEMAPHORE =
This helps catch problems with key names at runtime
'semaphore'.freeze
- BUCKETS =
'buckets'.freeze
- QUEUE =
'queue'.freeze
- SCHEDULE =
'schedule'.freeze
- STATUS =
'status'.freeze
Class Method Summary collapse
- .activate(conn, q_id) ⇒ Object
- .await_next_job(conn) ⇒ Object
- .bucket_num(q_id) ⇒ Object
- .maybe_add_to_rotation(conn, q_id) ⇒ Object
-
.pop(conn) ⇒ Object
Find the first active queue If nothing is in the rotation, then block.
- .pop_job(conn, q_id) ⇒ Object
- .push(conn, q_id) ⇒ Object
- .push_job(conn, payloads) ⇒ Object
- .queue_status(conn, q_id) ⇒ Object
-
.requeue(conn, q_id, jobs) ⇒ Object
Bulk requeue (push from right).
- .resume(conn, q_id) ⇒ Object
- .set_queue_is_empty(conn, q_id) ⇒ Object
- .status_bucket(q_id) ⇒ Object
- .suspend(conn, q_id) ⇒ Object
Class Method Details
.activate(conn, q_id) ⇒ Object
71 72 73 |
# File 'lib/roundhouse/monitor.rb', line 71 def activate(conn, q_id) set_queue_status(conn, q_id, ACTIVE) end |
.await_next_job(conn) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/roundhouse/monitor.rb', line 40 def await_next_job(conn) loop do queue_id = pop(conn) job = pop_job(conn, queue_id) return queue_id, job if job Roundhouse::Monitor.set_queue_is_empty(conn, queue_id) end end |
.bucket_num(q_id) ⇒ Object
105 106 107 |
# File 'lib/roundhouse/monitor.rb', line 105 def bucket_num(q_id) q_id.to_i / 1000 end |
.maybe_add_to_rotation(conn, q_id) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/roundhouse/monitor.rb', line 89 def maybe_add_to_rotation(conn, q_id) # NOTE: this really should be written in LUA to make # sure this is set to ACTIVE after pushing it into the # queuing semaphore. Otherwise, race conditions might # creep in giving this queue an unfair advantage. # See: https://github.com/resque/redis-namespace/blob/master/lib/redis/namespace.rb#L403-L413 # See: https://www.redisgreen.net/blog/intro-to-lua-for-redis-programmers/ return false unless queue_status(conn, q_id) == EMPTY activate(conn, q_id) conn.lpush(SEMAPHORE, q_id) end |
.pop(conn) ⇒ Object
Find the first active queue If nothing is in the rotation, then block
22 23 24 25 26 27 |
# File 'lib/roundhouse/monitor.rb', line 22 def pop(conn) loop do _, q_id = conn.brpop(SEMAPHORE) return q_id if queue_status(conn, q_id) == ACTIVE end end |
.pop_job(conn, q_id) ⇒ Object
49 50 51 |
# File 'lib/roundhouse/monitor.rb', line 49 def pop_job(conn, q_id) conn.rpop("#{QUEUE}:#{q_id}") end |
.push(conn, q_id) ⇒ Object
29 30 31 32 |
# File 'lib/roundhouse/monitor.rb', line 29 def push(conn, q_id) return unless queue_status(conn, q_id) == ACTIVE conn.lpush(SEMAPHORE, q_id) end |
.push_job(conn, payloads) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/roundhouse/monitor.rb', line 53 def push_job(conn, payloads) return schedule(conn, payloads) if payloads.first['at'] q_id = payloads.first['queue_id'] now = Time.now.to_f to_push = payloads.map do |entry| entry['enqueued_at'.freeze] = now Roundhouse.dump_json(entry) end conn.lpush("#{QUEUE}:#{q_id}", to_push) maybe_add_to_rotation(conn, q_id) end |
.queue_status(conn, q_id) ⇒ Object
85 86 87 |
# File 'lib/roundhouse/monitor.rb', line 85 def queue_status(conn, q_id) conn.hget(status_bucket(q_id), q_id).to_i || EMPTY end |
.requeue(conn, q_id, jobs) ⇒ Object
Bulk requeue (push from right). Usually done via Client, when Roundhouse is terminating
36 37 38 |
# File 'lib/roundhouse/monitor.rb', line 36 def requeue(conn, q_id, jobs) conn.rpush("#{QUEUE}:#{q_id}", jobs) end |
.resume(conn, q_id) ⇒ Object
79 80 81 82 83 |
# File 'lib/roundhouse/monitor.rb', line 79 def resume(conn, q_id) return unless queue_status(conn, q_id) == SUSPENDED set_queue_status(conn, q_id, ACTIVE) conn.lpush(SEMAPHORE, q_id) end |
.set_queue_is_empty(conn, q_id) ⇒ Object
67 68 69 |
# File 'lib/roundhouse/monitor.rb', line 67 def set_queue_is_empty(conn, q_id) set_queue_status(conn, q_id, EMPTY) end |
.status_bucket(q_id) ⇒ Object
101 102 103 |
# File 'lib/roundhouse/monitor.rb', line 101 def status_bucket(q_id) "#{STATUS}:#{bucket_num(q_id)}" end |
.suspend(conn, q_id) ⇒ Object
75 76 77 |
# File 'lib/roundhouse/monitor.rb', line 75 def suspend(conn, q_id) set_queue_status(conn, q_id, SUSPENDED) end |