Class: WorkerRoulette::Tradesman

Inherits:
Object
  • Object
show all
Defined in:
lib/worker_roulette/tradesman.rb

Constant Summary collapse

LUA_DRAIN_WORK_ORDERS =
"  local empty_string      = \"\"\n  local job_board_key     = KEYS[1]\n  local last_sender_key   = KEYS[2] or empty_string\n  local sender_key        = ARGV[1] or empty_string\n  local redis_call        = redis.call\n  local lock_key_prefix   = \"L*:\"\n  local lock_value        = 1\n  local ex                = \"EX\"\n  local nx                = \"NX\"\n  local get               = \"GET\"\n  local set               = \"SET\"\n  local del               = \"DEL\"\n  local lrange            = \"LRANGE\"\n  local zrank             = \"ZRANK\"\n  local zrange            = \"ZRANGE\"\n  local zrem              = \"ZREM\"\n  local zcard             = 'ZCARD'\n\n  local function drain_work_orders(job_board_key, last_sender_key, sender_key)\n\n    --kill lock for last_sender_key\n    if last_sender_key ~= empty_string then\n      local last_sender_lock_key = lock_key_prefix .. last_sender_key\n      redis_call(del, last_sender_lock_key)\n    end\n\n    if sender_key == empty_string then\n      sender_key = redis_call(zrange, job_board_key, 0, 0)[1] or empty_string\n\n      -- return if job_board is empty\n      if sender_key == empty_string then\n        return {empty_string, {}, 0}\n      end\n    end\n\n    local lock_key       = lock_key_prefix .. sender_key\n    local was_not_locked = redis_call(set, lock_key, lock_value, ex, 3, nx)\n\n    if was_not_locked then\n      local work_orders    = redis_call(lrange, sender_key, 0, -1)\n      redis_call(del, sender_key)\n\n      redis_call(zrem, job_board_key, sender_key)\n      local remaining_jobs = redis_call(zcard, job_board_key) or 0\n\n      return {sender_key, work_orders, remaining_jobs}\n    else\n      local sender_index    = redis_call(zrank, job_board_key, sender_key)\n      local next_index      = sender_index + 1\n      local next_sender_key = redis_call(zrange, job_board_key, next_index, next_index)[1]\n      if next_sender_key then\n        return drain_work_orders(job_board_key, empty_string, next_sender_key)\n      else\n        -- return if job_board is empty\n        return {empty_string, {}, 0}\n      end\n    end\n  end\n\n  return drain_work_orders(job_board_key, last_sender_key, empty_string)\n"
LUA_DRAIN_WORK_ORDERS_FOR_SENDER =
"  local empty_string     = \"\"\n  local job_board_key    = KEYS[1]\n  local sender_key       = KEYS[2] or empty_string\n  local redis_call       = redis.call\n  local lock_key_prefix  = \"L*:\"\n  local lock_value       = 1\n  local get              = \"GET\"\n  local del              = \"DEL\"\n  local lrange           = \"LRANGE\"\n  local zrange           = \"ZRANGE\"\n  local zrem             = \"ZREM\"\n\n  local function drain_work_orders_for_sender(job_board_key, sender_key)\n\n    if sender_key == empty_string then\n      return {empty_string, {}, 0}\n    end\n\n    local lock_key = lock_key_prefix .. sender_key\n    was_locked = redis_call(get, lock_key)\n\n    if was_locked == 1 then\n      local work_orders = redis_call(lrange, sender_key, 0, -1)\n      redis_call(del, sender_key)\n\n      redis_call(zrem, job_board_key, sender_key)\n\n      return { sender_key, work_orders }\n    else\n      return { sender_key, {} }\n    end\n  end\n\n  return drain_work_orders_for_sender(job_board_key, sender_key)\n"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME) ⇒ Tradesman

Returns a new instance of Tradesman.



105
106
107
108
109
110
111
112
113
# File 'lib/worker_roulette/tradesman.rb', line 105

def initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME)
  @evented        = evented
  @polling_time   = polling_time
  @redis_pool     = redis_pool
  @namespace      = namespace
  @channel        = namespace || WorkerRoulette::JOB_NOTIFICATIONS
  @lua            = Lua.new(@redis_pool)
  @remaining_jobs = 0
end

Instance Attribute Details

#last_senderObject (readonly)

Returns the value of attribute last_sender.



3
4
5
# File 'lib/worker_roulette/tradesman.rb', line 3

def last_sender
  @last_sender
end

#remaining_jobsObject (readonly)

Returns the value of attribute remaining_jobs.



3
4
5
# File 'lib/worker_roulette/tradesman.rb', line 3

def remaining_jobs
  @remaining_jobs
end

#timerObject (readonly)

Returns the value of attribute timer.



3
4
5
# File 'lib/worker_roulette/tradesman.rb', line 3

def timer
  @timer
end

Instance Method Details

#get_more_work_for_last_sender(&on_message_callback) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/worker_roulette/tradesman.rb', line 139

def get_more_work_for_last_sender(&on_message_callback)
  return unless on_message_callback
  more_work_orders! do |work|
    on_message_callback.call(work) if work.any?
    if @evented
      evented_drain_work_queue!(&on_message_callback)
    else
      non_evented_drain_work_queue!(&on_message_callback)
    end
  end
end

#job_board_keyObject



162
163
164
# File 'lib/worker_roulette/tradesman.rb', line 162

def job_board_key
  @job_board_key ||= WorkerRoulette.job_board_key(@namespace)
end

#more_work_orders!(&callback) ⇒ Object



151
152
153
154
155
156
157
158
159
160
# File 'lib/worker_roulette/tradesman.rb', line 151

def more_work_orders!(&callback)
  @lua.call(LUA_DRAIN_WORK_ORDERS_FOR_SENDER, [job_board_key, @last_sender]) do |results|
    sender_key      = results[0]
    raise "wrong sender key returned from LUA_DRAIN_WORK_ORDERS_FOR_SENDER" unless sender_key == @last_sender
    work_orders     = results[1]
    work            = work_orders.map {|work_order| WorkerRoulette.load(work_order)}
    callback.call work if callback
    work
  end
end

#wait_for_work_orders(&on_message_callback) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/worker_roulette/tradesman.rb', line 115

def wait_for_work_orders(&on_message_callback)
  return unless on_message_callback
  work_orders! do |work|
    on_message_callback.call(work) if work.any?
    if @evented
      evented_drain_work_queue!(&on_message_callback)
    else
      non_evented_drain_work_queue!(&on_message_callback)
    end
  end
end

#work_orders!(&callback) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/worker_roulette/tradesman.rb', line 127

def work_orders!(&callback)
  @lua.call(LUA_DRAIN_WORK_ORDERS, [job_board_key, @last_sender], [nil]) do |results|
    sender_key      = results[0]
    work_orders     = results[1]
    @remaining_jobs = results[2]
    @last_sender    = sender_key.split(':').last
    work            = work_orders.map {|work_order| WorkerRoulette.load(work_order)}
    callback.call work if callback
    work
  end
end