Class: WorkerRoulette::Tradesman
- Inherits:
-
Object
- Object
- WorkerRoulette::Tradesman
- Defined in:
- lib/worker_roulette/tradesman.rb
Constant Summary collapse
- LUA_DRAIN_WORK_ORDERS =
" local empty_string = \"\"\n local job_board_key = KEYS[1]\n local last_sender_key = KEYS[2] or empty_string\n local sender_key = ARGV[1] or empty_string\n local redis_call = redis.call\n local lock_key_prefix = \"L*:\"\n local lock_value = 1\n local ex = \"EX\"\n local nx = \"NX\"\n local get = \"GET\"\n local set = \"SET\"\n local del = \"DEL\"\n local lrange = \"LRANGE\"\n local zrank = \"ZRANK\"\n local zrange = \"ZRANGE\"\n local zrem = \"ZREM\"\n local zcard = 'ZCARD'\n\n local function drain_work_orders(job_board_key, last_sender_key, sender_key)\n\n --kill lock for last_sender_key\n if last_sender_key ~= empty_string then\n local last_sender_lock_key = lock_key_prefix .. last_sender_key\n redis_call(del, last_sender_lock_key)\n end\n\n if sender_key == empty_string then\n sender_key = redis_call(zrange, job_board_key, 0, 0)[1] or empty_string\n\n -- return if job_board is empty\n if sender_key == empty_string then\n return {empty_string, {}, 0}\n end\n end\n\n local lock_key = lock_key_prefix .. sender_key\n local was_not_locked = redis_call(set, lock_key, lock_value, ex, 3, nx)\n\n if was_not_locked then\n local work_orders = redis_call(lrange, sender_key, 0, -1)\n redis_call(del, sender_key)\n\n redis_call(zrem, job_board_key, sender_key)\n local remaining_jobs = redis_call(zcard, job_board_key) or 0\n\n return {sender_key, work_orders, remaining_jobs}\n else\n local sender_index = redis_call(zrank, job_board_key, sender_key)\n local next_index = sender_index + 1\n local next_sender_key = redis_call(zrange, job_board_key, next_index, next_index)[1]\n if next_sender_key then\n return drain_work_orders(job_board_key, empty_string, next_sender_key)\n else\n -- return if job_board is empty\n return {empty_string, {}, 0}\n end\n end\n end\n\n return drain_work_orders(job_board_key, last_sender_key, empty_string)\n"- LUA_DRAIN_WORK_ORDERS_FOR_SENDER =
" local empty_string = \"\"\n local job_board_key = KEYS[1]\n local sender_key = KEYS[2] or empty_string\n local redis_call = redis.call\n local lock_key_prefix = \"L*:\"\n local lock_value = 1\n local get = \"GET\"\n local del = \"DEL\"\n local lrange = \"LRANGE\"\n local zrange = \"ZRANGE\"\n local zrem = \"ZREM\"\n\n local function drain_work_orders_for_sender(job_board_key, sender_key)\n\n if sender_key == empty_string then\n return {empty_string, {}, 0}\n end\n\n local lock_key = lock_key_prefix .. sender_key\n was_locked = redis_call(get, lock_key)\n\n if was_locked == 1 then\n local work_orders = redis_call(lrange, sender_key, 0, -1)\n redis_call(del, sender_key)\n\n redis_call(zrem, job_board_key, sender_key)\n\n return { sender_key, work_orders }\n else\n return { sender_key, {} }\n end\n end\n\n return drain_work_orders_for_sender(job_board_key, sender_key)\n"
Instance Attribute Summary collapse
-
#last_sender ⇒ Object
readonly
Returns the value of attribute last_sender.
-
#remaining_jobs ⇒ Object
readonly
Returns the value of attribute remaining_jobs.
-
#timer ⇒ Object
readonly
Returns the value of attribute timer.
Instance Method Summary collapse
- #get_more_work_for_last_sender(&on_message_callback) ⇒ Object
-
#initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME) ⇒ Tradesman
constructor
A new instance of Tradesman.
- #job_board_key ⇒ Object
- #more_work_orders!(&callback) ⇒ Object
- #wait_for_work_orders(&on_message_callback) ⇒ Object
- #work_orders!(&callback) ⇒ Object
Constructor Details
#initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME) ⇒ Tradesman
Returns a new instance of Tradesman.
105 106 107 108 109 110 111 112 113 |
# File 'lib/worker_roulette/tradesman.rb', line 105 def initialize(redis_pool, evented, namespace = nil, polling_time = WorkerRoulette::DEFAULT_POLLING_TIME) @evented = evented @polling_time = polling_time @redis_pool = redis_pool @namespace = namespace @channel = namespace || WorkerRoulette::JOB_NOTIFICATIONS @lua = Lua.new(@redis_pool) @remaining_jobs = 0 end |
Instance Attribute Details
#last_sender ⇒ Object (readonly)
Returns the value of attribute last_sender.
3 4 5 |
# File 'lib/worker_roulette/tradesman.rb', line 3 def last_sender @last_sender end |
#remaining_jobs ⇒ Object (readonly)
Returns the value of attribute remaining_jobs.
3 4 5 |
# File 'lib/worker_roulette/tradesman.rb', line 3 def remaining_jobs @remaining_jobs end |
#timer ⇒ Object (readonly)
Returns the value of attribute timer.
3 4 5 |
# File 'lib/worker_roulette/tradesman.rb', line 3 def timer @timer end |
Instance Method Details
#get_more_work_for_last_sender(&on_message_callback) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/worker_roulette/tradesman.rb', line 139 def get_more_work_for_last_sender(&) return unless more_work_orders! do |work| .call(work) if work.any? if @evented evented_drain_work_queue!(&) else non_evented_drain_work_queue!(&) end end end |
#job_board_key ⇒ Object
162 163 164 |
# File 'lib/worker_roulette/tradesman.rb', line 162 def job_board_key @job_board_key ||= WorkerRoulette.job_board_key(@namespace) end |
#more_work_orders!(&callback) ⇒ Object
151 152 153 154 155 156 157 158 159 160 |
# File 'lib/worker_roulette/tradesman.rb', line 151 def more_work_orders!(&callback) @lua.call(LUA_DRAIN_WORK_ORDERS_FOR_SENDER, [job_board_key, @last_sender]) do |results| sender_key = results[0] raise "wrong sender key returned from LUA_DRAIN_WORK_ORDERS_FOR_SENDER" unless sender_key == @last_sender work_orders = results[1] work = work_orders.map {|work_order| WorkerRoulette.load(work_order)} callback.call work if callback work end end |
#wait_for_work_orders(&on_message_callback) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/worker_roulette/tradesman.rb', line 115 def wait_for_work_orders(&) return unless work_orders! do |work| .call(work) if work.any? if @evented evented_drain_work_queue!(&) else non_evented_drain_work_queue!(&) end end end |
#work_orders!(&callback) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/worker_roulette/tradesman.rb', line 127 def work_orders!(&callback) @lua.call(LUA_DRAIN_WORK_ORDERS, [job_board_key, @last_sender], [nil]) do |results| sender_key = results[0] work_orders = results[1] @remaining_jobs = results[2] @last_sender = sender_key.split(':').last work = work_orders.map {|work_order| WorkerRoulette.load(work_order)} callback.call work if callback work end end |