Class: Roundhouse::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/roundhouse/monitor.rb

Overview

This class implements two things:

1. A turntable semaphore - the fetcher can pop the next available
   exclusive right to something (such as API request with a given
   auth token)
2. Track which access right is temporarily suspended

Constant Summary collapse

ACTIVE =
1
EMPTY =
0
SUSPENDED =
-1
TURNTABLE =

This helps catch problems with key names at runtime

'turntable'.freeze
IN_ROTATION =
'inrotation'.freeze
BUCKETS =
'buckets'.freeze
QUEUE =
'queue'.freeze
SCHEDULE =
'schedule'.freeze
STATUS =
'status'.freeze
TURNTABLE_TIMEOUT =

Number of seconds to block on turntable

5
WOLVERINE_CONFIG =

class << self

Roundhouse::Script::Configuration.new
LUA_TURNTABLE_PUSH =
<<END
local status = redis.call("HGET", KEYS[1], ARGV[1])
local in_rotation = redis.call("SISMEMBER", KEYS[3], ARGV[1])
if status == "#{ACTIVE}" and in_rotation == 0 then
  redis.call("SADD", KEYS[3], ARGV[1])
  return redis.call("LPUSH", KEYS[2], ARGV[1])
end
return nil
END
LUA_MAYBE_TURNTABLE_PUSH =
<<END
local status = redis.call("HGET", KEYS[1], ARGV[1])
if status == "#{EMPTY}" or status == nil or status == false then
  redis.call('SADD', KEYS[3], ARGV[2])
  redis.call("HSET", KEYS[1], ARGV[1], #{ACTIVE})
  local in_rotation = redis.call("SISMEMBER", KEYS[4], ARGV[1])
  if in_rotation == 0 then
    redis.call('SADD', KEYS[4], ARGV[1])
    return redis.call("LPUSH", KEYS[2], ARGV[1])
  end
end
return nil
END
LUA_POP_JOB =
<<END
local job = redis.call("RPOP", KEYS[1])
if type(job) == "string" then
  return job
end
redis.call("HSET", KEYS[2], ARGV[1], #{EMPTY})
return nil
END
LUA_RESUME_QUEUE =
<<END
local status = redis.call("HGET", KEYS[1], ARGV[1])
if status == "#{SUSPENDED}" then
  redis.call("HSET", KEYS[1], ARGV[1], #{ACTIVE})
  local in_rotation = redis.call("SISMEMBER", KEYS[3], ARGV[1])
  if in_rotation == 0 then
    redis.call('SADD', KEYS[4], ARGV[1])
    return redis.call("LPUSH", KEYS[2], ARGV[1])
  end
end
return nil
END
TURNTABLE_PUSH =
Roundhouse::Script.new(LUA_TURNTABLE_PUSH.freeze, name: :turntable_push, config: WOLVERINE_CONFIG)
MAYBE_TURNTABLE_PUSH =
Roundhouse::Script.new(LUA_MAYBE_TURNTABLE_PUSH.freeze, name: :maybe_turntable_push, config: WOLVERINE_CONFIG)
POP_JOB =
Roundhouse::Script.new(LUA_POP_JOB.freeze, name: :pop_job, config: WOLVERINE_CONFIG)
RESUME_QUEUE =
Roundhouse::Script.new(LUA_RESUME_QUEUE.freeze, name: :resume_queue, config: WOLVERINE_CONFIG)

Class Method Summary collapse

Class Method Details

.activate(conn, q_id) ⇒ Object



82
83
84
# File 'lib/roundhouse/monitor.rb', line 82

def activate(conn, q_id)
  set_queue_status(conn, q_id, ACTIVE)
end

.bucket_num(q_id) ⇒ Object



111
112
113
# File 'lib/roundhouse/monitor.rb', line 111

def bucket_num(q_id)
  q_id.to_i / 1000
end

.maybe_add_to_rotation(conn, q_id) ⇒ Object

Only push onto turntable is status is empty If empty, push into turntable and set to active Subsequent job pushes would see the active queue and not push into turntable Used after pushing a job, and conditionally putting it into rotation



103
104
105
# File 'lib/roundhouse/monitor.rb', line 103

def maybe_add_to_rotation(conn, q_id)
  MAYBE_TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, BUCKETS, IN_ROTATION], [q_id, bucket_num(q_id)]
end

.maybe_next_job(conn) ⇒ Object



49
50
51
52
53
54
55
56
# File 'lib/roundhouse/monitor.rb', line 49

def maybe_next_job(conn)
  queue_id = pop(conn)
  return nil unless queue_id

  job = pop_job(conn, queue_id)
  return queue_id, job if job
  return nil
end

.pop(conn) ⇒ Object

Find the first active queue Return nil if nothing is there. The fetcher is responsible for polling.



29
30
31
32
33
34
# File 'lib/roundhouse/monitor.rb', line 29

def pop(conn)
  _, q_id = conn.brpop(TURNTABLE, TURNTABLE_TIMEOUT)
  conn.srem(IN_ROTATION, q_id)
  return q_id if queue_status(conn, q_id) == ACTIVE
  return nil
end

.pop_job(conn, q_id) ⇒ Object

Atomically pop job If nothing is in the queue, set queue status to empty



60
61
62
# File 'lib/roundhouse/monitor.rb', line 60

def pop_job(conn, q_id)
  POP_JOB.call conn, ["#{QUEUE}:#{q_id}", status_bucket(q_id)], [q_id]
end

.push(conn, q_id) ⇒ Object

Atomic push Push queue into turntable if and only if queue status is active



38
39
40
41
# File 'lib/roundhouse/monitor.rb', line 38

def push(conn, q_id)
  # NOTE: this version of redis-namespace has a bug when you do keys: argv: params
  TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id]
end

.push_job(conn, payloads) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/roundhouse/monitor.rb', line 64

def push_job(conn, payloads)
  return schedule(conn, payloads) if payloads.first['at']

  q_id = payloads.first['queue_id']
  now = Time.now.to_f
  to_push = payloads.map do |entry|
    entry['enqueued_at'.freeze] = now
    Roundhouse.dump_json(entry)
  end
  conn.lpush("#{QUEUE}:#{q_id}", to_push)

  maybe_add_to_rotation(conn, q_id)
end

.queue_status(conn, q_id) ⇒ Object



94
95
96
# File 'lib/roundhouse/monitor.rb', line 94

def queue_status(conn, q_id)
  conn.hget(status_bucket(q_id), q_id).to_i || EMPTY
end

.rebuild_turntable!Object

TODO: This needs to be in a lua script and atomic Need to test how well this scales too. Should be testing about 10,000 queues



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/roundhouse/monitor.rb', line 118

def rebuild_turntable!
  Roundhouse.redis do |conn|
    buckets = conn.smembers(BUCKETS)
    queues = conn.pipelined do
      buckets.each { |bucket| conn.hgetall("#{STATUS}:#{bucket}") }
    end

    all_queue_ids = queues.map(&:keys).flatten
    queue_len_res = conn.pipelined do
      all_queue_ids.each { |queue| conn.llen("#{QUEUE}:#{queue}") }
    end

    queue_len = all_queue_ids.zip(queue_len_res)
    status = queues.inject({}) { |a,x| a.merge(x) }

    conn.multi do
      conn.del(TURNTABLE)
      queue_len.each do |(q_id, len)|
        s = status[q_id].to_i

        case s
        when SUSPENDED then next
        when ACTIVE then
          if len > 0
            conn.lpush(TURNTABLE, q_id)
            conn.sadd(IN_ROTATION, q_id)
          else
            set_queue_status(conn, q_id, EMPTY, false)
          end
        when EMPTY then
          next if len <= 0
          conn.lpush(TURNTABLE, q_id)
          conn.sadd(IN_ROTATION, q_id)
          set_queue_status(conn, q_id, ACTIVE, false)
        else
          set_queue_status(conn, q_id, SUSPENDED, false)
        end
      end
    end
  end
end

.requeue(conn, q_id, jobs) ⇒ Object

Bulk requeue (push from right). Usually done via Client, when Roundhouse is terminating



45
46
47
# File 'lib/roundhouse/monitor.rb', line 45

def requeue(conn, q_id, jobs)
  conn.rpush("#{QUEUE}:#{q_id}", jobs)
end

.resume(conn, q_id) ⇒ Object



90
91
92
# File 'lib/roundhouse/monitor.rb', line 90

def resume(conn, q_id)
  RESUME_QUEUE.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id]
end

.set_queue_is_empty(conn, q_id) ⇒ Object



78
79
80
# File 'lib/roundhouse/monitor.rb', line 78

def set_queue_is_empty(conn, q_id)
  set_queue_status(conn, q_id, EMPTY)
end

.status_bucket(q_id) ⇒ Object



107
108
109
# File 'lib/roundhouse/monitor.rb', line 107

def status_bucket(q_id)
  "#{STATUS}:#{bucket_num(q_id)}"
end

.suspend(conn, q_id) ⇒ Object



86
87
88
# File 'lib/roundhouse/monitor.rb', line 86

def suspend(conn, q_id)
  set_queue_status(conn, q_id, SUSPENDED)
end