Class: Roundhouse::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/roundhouse/monitor.rb

Overview

This class implements two things:

1. A queueing semaphore - the fetcher can pop the next available
   exclusive right to something (such as API request with a given
   auth token)
2. Track which access right is temporarily suspended

Constant Summary collapse

ACTIVE =
1
EMPTY =
0
SUSPENDED =
-1
SEMAPHORE =

This helps catch problems with key names at runtime

'semaphore'.freeze
BUCKETS =
'buckets'.freeze
QUEUE =
'queue'.freeze
SCHEDULE =
'schedule'.freeze
STATUS =
'status'.freeze

Class Method Summary collapse

Class Method Details

.activate(conn, q_id) ⇒ Object



71
72
73
# File 'lib/roundhouse/monitor.rb', line 71

def activate(conn, q_id)
  set_queue_status(conn, q_id, ACTIVE)
end

.await_next_job(conn) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/roundhouse/monitor.rb', line 40

def await_next_job(conn)
  loop do
    queue_id = pop(conn)
    job = pop_job(conn, queue_id)
    return queue_id, job if job
    Roundhouse::Monitor.set_queue_is_empty(conn, queue_id)
  end
end

.bucket_num(q_id) ⇒ Object



105
106
107
# File 'lib/roundhouse/monitor.rb', line 105

def bucket_num(q_id)
  q_id.to_i / 1000
end

.maybe_add_to_rotation(conn, q_id) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/roundhouse/monitor.rb', line 89

def maybe_add_to_rotation(conn, q_id)
  # NOTE: this really should be written in LUA to make
  # sure this is set to ACTIVE after pushing it into the
  # queuing semaphore. Otherwise, race conditions might
  # creep in giving this queue an unfair advantage.
  # See: https://github.com/resque/redis-namespace/blob/master/lib/redis/namespace.rb#L403-L413
  # See: https://www.redisgreen.net/blog/intro-to-lua-for-redis-programmers/
  return false unless queue_status(conn, q_id) == EMPTY
  activate(conn, q_id)
  conn.lpush(SEMAPHORE, q_id)
end

.pop(conn) ⇒ Object

Find the first active queue If nothing is in the rotation, then block



22
23
24
25
26
27
# File 'lib/roundhouse/monitor.rb', line 22

def pop(conn)
  loop do
    _, q_id = conn.brpop(SEMAPHORE)
    return q_id if queue_status(conn, q_id) == ACTIVE
  end
end

.pop_job(conn, q_id) ⇒ Object



49
50
51
# File 'lib/roundhouse/monitor.rb', line 49

def pop_job(conn, q_id)
  conn.rpop("#{QUEUE}:#{q_id}")
end

.push(conn, q_id) ⇒ Object



29
30
31
32
# File 'lib/roundhouse/monitor.rb', line 29

def push(conn, q_id)
  return unless queue_status(conn, q_id) == ACTIVE
  conn.lpush(SEMAPHORE, q_id)
end

.push_job(conn, payloads) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/roundhouse/monitor.rb', line 53

def push_job(conn, payloads)
  return schedule(conn, payloads) if payloads.first['at']

  q_id = payloads.first['queue_id']
  now = Time.now.to_f
  to_push = payloads.map do |entry|
    entry['enqueued_at'.freeze] = now
    Roundhouse.dump_json(entry)
  end
  conn.lpush("#{QUEUE}:#{q_id}", to_push)

  maybe_add_to_rotation(conn, q_id)
end

.queue_status(conn, q_id) ⇒ Object



85
86
87
# File 'lib/roundhouse/monitor.rb', line 85

def queue_status(conn, q_id)
  conn.hget(status_bucket(q_id), q_id).to_i || EMPTY
end

.requeue(conn, q_id, jobs) ⇒ Object

Bulk requeue (push from right). Usually done via Client, when Roundhouse is terminating



36
37
38
# File 'lib/roundhouse/monitor.rb', line 36

def requeue(conn, q_id, jobs)
  conn.rpush("#{QUEUE}:#{q_id}", jobs)
end

.resume(conn, q_id) ⇒ Object



79
80
81
82
83
# File 'lib/roundhouse/monitor.rb', line 79

def resume(conn, q_id)
  return unless queue_status(conn, q_id) == SUSPENDED
  set_queue_status(conn, q_id, ACTIVE)
  conn.lpush(SEMAPHORE, q_id)
end

.set_queue_is_empty(conn, q_id) ⇒ Object



67
68
69
# File 'lib/roundhouse/monitor.rb', line 67

def set_queue_is_empty(conn, q_id)
  set_queue_status(conn, q_id, EMPTY)
end

.status_bucket(q_id) ⇒ Object



101
102
103
# File 'lib/roundhouse/monitor.rb', line 101

def status_bucket(q_id)
  "#{STATUS}:#{bucket_num(q_id)}"
end

.suspend(conn, q_id) ⇒ Object



75
76
77
# File 'lib/roundhouse/monitor.rb', line 75

def suspend(conn, q_id)
  set_queue_status(conn, q_id, SUSPENDED)
end