Class: WorkerRoulette::Foreman
- Inherits:
-
Object
- Object
- WorkerRoulette::Foreman
- Includes:
- Preprocessor
- Defined in:
- lib/worker_roulette/foreman.rb
Constant Summary collapse
- LUA_ENQUEUE_WORK_ORDERS =
<<-HERE local counter_key = KEYS[1] local job_board_key = KEYS[2] local sender_key = KEYS[3] local channel = KEYS[4] local work_order = ARGV[1] local job_notification = ARGV[2] local redis_call = redis.call local zscore = 'ZSCORE' local incr = 'INCR' local zadd = 'ZADD' local rpush = 'RPUSH' local publish = 'PUBLISH' local zcard = 'ZCARD' local del = 'DEL' local function enqueue_work_orders(work_order, job_notification) redis_call(rpush, sender_key, work_order) -- called when a work from a new sender is added if (redis_call(zscore, job_board_key, sender_key) == false) then local count = redis_call(incr, counter_key) redis_call(zadd, job_board_key, count, sender_key) end end enqueue_work_orders(work_order, job_notification) HERE
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#namespace ⇒ Object
readonly
Returns the value of attribute namespace.
-
#preprocessors ⇒ Object
readonly
Returns the value of attribute preprocessors.
-
#sender ⇒ Object
readonly
Returns the value of attribute sender.
Instance Method Summary collapse
- #counter_key ⇒ Object
- #enqueue(work_order, &callback) ⇒ Object
- #enqueue_work_order(work_order, headers = {}, &callback) ⇒ Object
-
#initialize(redis_pool, sender, namespace = nil, preprocessors = []) ⇒ Foreman
constructor
A new instance of Foreman.
- #job_board_key ⇒ Object
- #sender_key ⇒ Object
Methods included from Preprocessor
Constructor Details
#initialize(redis_pool, sender, namespace = nil, preprocessors = []) ⇒ Foreman
Returns a new instance of Foreman.
39 40 41 42 43 44 45 46 |
# File 'lib/worker_roulette/foreman.rb', line 39 def initialize(redis_pool, sender, namespace = nil, preprocessors = []) @redis_pool = redis_pool @sender = sender @preprocessors = preprocessors @namespace = namespace @channel = namespace || WorkerRoulette::JOB_NOTIFICATIONS @lua = Lua.new(@redis_pool) end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/worker_roulette/foreman.rb', line 7 def channel @channel end |
#namespace ⇒ Object (readonly)
Returns the value of attribute namespace.
7 8 9 |
# File 'lib/worker_roulette/foreman.rb', line 7 def namespace @namespace end |
#preprocessors ⇒ Object (readonly)
Returns the value of attribute preprocessors.
7 8 9 |
# File 'lib/worker_roulette/foreman.rb', line 7 def preprocessors @preprocessors end |
#sender ⇒ Object (readonly)
Returns the value of attribute sender.
7 8 9 |
# File 'lib/worker_roulette/foreman.rb', line 7 def sender @sender end |
Instance Method Details
#counter_key ⇒ Object
62 63 64 |
# File 'lib/worker_roulette/foreman.rb', line 62 def counter_key @counter_key ||= WorkerRoulette.counter_key(@namespace) end |
#enqueue(work_order, &callback) ⇒ Object
53 54 55 56 |
# File 'lib/worker_roulette/foreman.rb', line 53 def enqueue(work_order, &callback) @lua.call(LUA_ENQUEUE_WORK_ORDERS, [counter_key, job_board_key, sender_key, @channel], [WorkerRoulette.dump(preprocess(work_order, channel)), WorkerRoulette::JOB_NOTIFICATIONS], &callback) end |
#enqueue_work_order(work_order, headers = {}, &callback) ⇒ Object
48 49 50 51 |
# File 'lib/worker_roulette/foreman.rb', line 48 def enqueue_work_order(work_order, headers = {}, &callback) work_order = {'headers' => default_headers.merge(headers), 'payload' => work_order} enqueue(work_order, &callback) end |
#job_board_key ⇒ Object
58 59 60 |
# File 'lib/worker_roulette/foreman.rb', line 58 def job_board_key @job_board_key ||= WorkerRoulette.job_board_key(@namespace) end |
#sender_key ⇒ Object
66 67 68 |
# File 'lib/worker_roulette/foreman.rb', line 66 def sender_key @sender_key ||= WorkerRoulette.sender_key(@sender, @namespace) end |