Class: WorkerRoulette::Tradesman

Inherits:
Object
  • Object
show all
Includes:
Preprocessor
Defined in:
lib/worker_roulette/tradesman.rb

Constant Summary collapse

LUA_DRAIN_WORK_ORDERS =
"  local empty_string      = \"\"\n  local job_board_key     = KEYS[1]\n  local last_sender_key   = KEYS[2] or empty_string\n  local sender_key        = ARGV[1] or empty_string\n  local redis_call        = redis.call\n  local lock_key_prefix   = \"L*:\"\n  local lock_value        = 1\n  local ex                = \"EX\"\n  local nx                = \"NX\"\n  local get               = \"GET\"\n  local set               = \"SET\"\n  local del               = \"DEL\"\n  local lrange            = \"LRANGE\"\n  local zrank             = \"ZRANK\"\n  local zrange            = \"ZRANGE\"\n  local zrem              = \"ZREM\"\n  local zcard             = 'ZCARD'\n\n  local function drain_work_orders(job_board_key, last_sender_key, sender_key)\n\n    --kill lock for last_sender_key\n    if last_sender_key ~= empty_string then\n      local last_sender_lock_key = lock_key_prefix .. last_sender_key\n      redis_call(del, last_sender_lock_key)\n    end\n\n    if sender_key == empty_string then\n      sender_key = redis_call(zrange, job_board_key, 0, 0)[1] or empty_string\n\n      -- return if job_board is empty\n      if sender_key == empty_string then\n        return {empty_string, {}, 0}\n      end\n    end\n\n    local lock_key       = lock_key_prefix .. sender_key\n    local was_not_locked = redis_call(set, lock_key, lock_value, ex, 3, nx)\n\n    if was_not_locked then\n      local work_orders    = redis_call(lrange, sender_key, 0, -1)\n      redis_call(del, sender_key)\n\n      redis_call(zrem, job_board_key, sender_key)\n      local remaining_jobs = redis_call(zcard, job_board_key) or 0\n\n      return {sender_key, work_orders, remaining_jobs}\n    else\n      local sender_index    = redis_call(zrank, job_board_key, sender_key)\n      local next_index      = sender_index + 1\n      local next_sender_key = redis_call(zrange, job_board_key, next_index, next_index)[1]\n      if next_sender_key then\n        return drain_work_orders(job_board_key, empty_string, next_sender_key)\n      else\n        -- return if job_board is empty\n        return {empty_string, {}, 0}\n      end\n    end\n  end\n\n  return drain_work_orders(job_board_key, last_sender_key, empty_string)\n"
LUA_DRAIN_WORK_ORDERS_FOR_SENDER =
"  local empty_string     = \"\"\n  local job_board_key    = KEYS[1]\n  local sender_key       = KEYS[2] or empty_string\n  local redis_call       = redis.call\n  local lock_key_prefix  = \"L*:\"\n  local lock_value       = 1\n  local get              = \"GET\"\n  local del              = \"DEL\"\n  local lrange           = \"LRANGE\"\n  local zrange           = \"ZRANGE\"\n  local zrem             = \"ZREM\"\n\n  local function drain_work_orders_for_sender(job_board_key, sender_key)\n\n    if sender_key == empty_string then\n      return {empty_string, {}, 0}\n    end\n\n    local lock_key = lock_key_prefix .. sender_key\n    was_locked = redis_call(get, lock_key)\n\n    if was_locked == 1 then\n      local work_orders = redis_call(lrange, sender_key, 0, -1)\n      redis_call(del, sender_key)\n\n      redis_call(zrem, job_board_key, sender_key)\n\n      return { sender_key, work_orders }\n    else\n      return { sender_key, {} }\n    end\n  end\n\n  return drain_work_orders_for_sender(job_board_key, sender_key)\n"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Preprocessor

#preprocess

Constructor Details

#initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME, preprocessors = []) ⇒ Tradesman

Returns a new instance of Tradesman.



108
109
110
111
112
113
114
115
116
117
# File 'lib/worker_roulette/tradesman.rb', line 108

def initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME, preprocessors = [])
  @evented        = evented
  @polling_time   = polling_time
  @preprocessors  = preprocessors
  @redis_pool     = redis_pool
  @namespace      = namespace
  @channel        = namespace || WorkerRoulette::JOB_NOTIFICATIONS
  @lua            = Lua.new(@redis_pool)
  @remaining_jobs = 0
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



6
7
8
# File 'lib/worker_roulette/tradesman.rb', line 6

def channel
  @channel
end

#last_senderObject (readonly)

Returns the value of attribute last_sender.



6
7
8
# File 'lib/worker_roulette/tradesman.rb', line 6

def last_sender
  @last_sender
end

#preprocessorsObject (readonly)

Returns the value of attribute preprocessors.



6
7
8
# File 'lib/worker_roulette/tradesman.rb', line 6

def preprocessors
  @preprocessors
end

#remaining_jobsObject (readonly)

Returns the value of attribute remaining_jobs.



6
7
8
# File 'lib/worker_roulette/tradesman.rb', line 6

def remaining_jobs
  @remaining_jobs
end

#timerObject (readonly)

Returns the value of attribute timer.



6
7
8
# File 'lib/worker_roulette/tradesman.rb', line 6

def timer
  @timer
end

Instance Method Details

#get_more_work_for_last_sender(&on_message_callback) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/worker_roulette/tradesman.rb', line 145

def get_more_work_for_last_sender(&on_message_callback)
  return unless on_message_callback
  more_work_orders! do |work|
    on_message_callback.call(work) if work.any?
    if @evented
      evented_drain_work_queue!(&on_message_callback)
    else
      non_evented_drain_work_queue!(&on_message_callback)
    end
  end
end

#job_board_keyObject



168
169
170
# File 'lib/worker_roulette/tradesman.rb', line 168

def job_board_key
  @job_board_key ||= WorkerRoulette.job_board_key(@namespace)
end

#more_work_orders!(&callback) ⇒ Object



157
158
159
160
161
162
163
164
165
166
# File 'lib/worker_roulette/tradesman.rb', line 157

def more_work_orders!(&callback)
  @lua.call(LUA_DRAIN_WORK_ORDERS_FOR_SENDER, [job_board_key, @last_sender]) do |results|
    sender_key      = results[0]
    raise "wrong sender key returned from LUA_DRAIN_WORK_ORDERS_FOR_SENDER" unless sender_key == @last_sender
    work_orders     = results[1]
    work            = work_orders.map {|work_order| WorkerRoulette.load(work_order)}
    callback.call work if callback
    work
  end
end

#wait_for_work_orders(&on_message_callback) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/worker_roulette/tradesman.rb', line 119

def wait_for_work_orders(&on_message_callback)
  return unless on_message_callback
  work_orders! do |work|
    on_message_callback.call(work) if work.any?
    if @evented
      evented_drain_work_queue!(&on_message_callback)
    else
      non_evented_drain_work_queue!(&on_message_callback)
    end
  end
end

#work_orders!(&callback) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/worker_roulette/tradesman.rb', line 131

def work_orders!(&callback)
  @lua.call(LUA_DRAIN_WORK_ORDERS, [job_board_key, @last_sender], [nil]) do |results|
    sender_key      = results[0]
    work_orders     = results[1]
    @remaining_jobs = results[2]
    @last_sender    = sender_key

    QueueMetricTracker.track_all(results) if work_orders.any?
    work = work_orders.map { |wo| preprocess(WorkerRoulette.load(wo), channel) }
    callback.call work if callback
    work
  end
end