Class: WorkerRoulette::Foreman

Inherits:
Object
  • Object
show all
Includes:
Preprocessor
Defined in:
lib/worker_roulette/foreman.rb

Constant Summary collapse

LUA_ENQUEUE_WORK_ORDERS =
<<-HERE
  local counter_key       = KEYS[1]
  local job_board_key     = KEYS[2]
  local sender_key        = KEYS[3]
  local channel           = KEYS[4]

  local work_order        = ARGV[1]
  local job_notification  = ARGV[2]
  local redis_call        = redis.call
  local zscore            = 'ZSCORE'
  local incr              = 'INCR'
  local zadd              = 'ZADD'
  local rpush             = 'RPUSH'
  local publish           = 'PUBLISH'
  local zcard             = 'ZCARD'
  local del               = 'DEL'

  local function enqueue_work_orders(work_order, job_notification)
    redis_call(rpush, sender_key, work_order)

    -- called when a work from a new sender is added
    if (redis_call(zscore, job_board_key, sender_key) == false) then
      local count     = redis_call(incr, counter_key)
      redis_call(zadd, job_board_key, count, sender_key)
    end
  end

  enqueue_work_orders(work_order, job_notification)
HERE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Preprocessor

#preprocess

Constructor Details

#initialize(redis_pool, sender, namespace = nil, preprocessors = []) ⇒ Foreman

Returns a new instance of Foreman.



39
40
41
42
43
44
45
46
# File 'lib/worker_roulette/foreman.rb', line 39

def initialize(redis_pool, sender, namespace = nil, preprocessors = [])
  @redis_pool    = redis_pool
  @sender        = sender
  @preprocessors = preprocessors
  @namespace     = namespace
  @channel       = namespace || WorkerRoulette::JOB_NOTIFICATIONS
  @lua           = Lua.new(@redis_pool)
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/worker_roulette/foreman.rb', line 7

def channel
  @channel
end

#namespaceObject (readonly)

Returns the value of attribute namespace.



7
8
9
# File 'lib/worker_roulette/foreman.rb', line 7

def namespace
  @namespace
end

#preprocessorsObject (readonly)

Returns the value of attribute preprocessors.



7
8
9
# File 'lib/worker_roulette/foreman.rb', line 7

def preprocessors
  @preprocessors
end

#senderObject (readonly)

Returns the value of attribute sender.



7
8
9
# File 'lib/worker_roulette/foreman.rb', line 7

def sender
  @sender
end

Instance Method Details

#counter_keyObject



62
63
64
# File 'lib/worker_roulette/foreman.rb', line 62

def counter_key
  @counter_key ||= WorkerRoulette.counter_key(@namespace)
end

#enqueue(work_order, &callback) ⇒ Object



53
54
55
56
# File 'lib/worker_roulette/foreman.rb', line 53

def enqueue(work_order, &callback)
  @lua.call(LUA_ENQUEUE_WORK_ORDERS, [counter_key, job_board_key, sender_key, @channel],
            [WorkerRoulette.dump(preprocess(work_order, channel)),  WorkerRoulette::JOB_NOTIFICATIONS], &callback)
end

#enqueue_work_order(work_order, headers = {}, &callback) ⇒ Object



48
49
50
51
# File 'lib/worker_roulette/foreman.rb', line 48

def enqueue_work_order(work_order, headers = {}, &callback)
  work_order = {'headers' => default_headers.merge(headers), 'payload' => work_order}
  enqueue(work_order, &callback)
end

#job_board_keyObject



58
59
60
# File 'lib/worker_roulette/foreman.rb', line 58

def job_board_key
  @job_board_key ||= WorkerRoulette.job_board_key(@namespace)
end

#sender_keyObject



66
67
68
# File 'lib/worker_roulette/foreman.rb', line 66

def sender_key
  @sender_key ||= WorkerRoulette.sender_key(@sender, @namespace)
end