Class: QlessLua

Inherits:
Object
  • Object
show all
Defined in:
lib/qless_lua.rb

Overview

A wrapper that will give you ‘qless.lua` source to be used with your redis

Constant Summary collapse

QLESS_LIB_SOURCE =
<<-LUA_SOURCE.strip.freeze
  -------------------------------------------------------------------------------
-- Forward declarations to make everything happy
-------------------------------------------------------------------------------
local Qless = {
ns = 'ql:'
}

-- Queue forward delcaration
local QlessQueue = {
ns = Qless.ns .. 'q:'
}
QlessQueue.__index = QlessQueue

-- Worker forward declaration
local QlessWorker = {
ns = Qless.ns .. 'w:'
}
QlessWorker.__index = QlessWorker

-- Job forward declaration
local QlessJob = {
ns = Qless.ns .. 'j:'
}
QlessJob.__index = QlessJob

-- RecurringJob forward declaration
local QlessRecurringJob = {}
QlessRecurringJob.__index = QlessRecurringJob

-- Config forward declaration
Qless.config = {}

-- Extend a table. This comes up quite frequently
local function tbl_extend(self, other)
for i, v in ipairs(other) do
  table.insert(self, v)
end
end

-- This is essentially the same as redis' publish, but it prefixes the channel
-- with the Qless namespace
function Qless.publish(channel, message)
redis.call('publish', Qless.ns .. channel, message)
end

-- Return a job object given its job id
function Qless.job(jid)
assert(jid, 'Job(): no jid provided')
local job = {}
setmetatable(job, QlessJob)
job.jid = jid
return job
end

-- Return a recurring job object
function Qless.recurring(jid)
assert(jid, 'Recurring(): no jid provided')
local job = {}
setmetatable(job, QlessRecurringJob)
job.jid = jid
return job
end

-- Failed([group, [start, [limit]]])
-- ------------------------------------
-- If no group is provided, this returns a JSON blob of the counts of the
-- various groups of failures known. If a group is provided, it will report up
-- to `limit` from `start` of the jobs affected by that issue.
--
--  # If no group, then...
--  {
--      'group1': 1,
--      'group2': 5,
--      ...
--  }
--
--  # If a group is provided, then...
--  {
--      'total': 20,
--      'jobs': [
--          {
--              # All the normal keys for a job
--              'jid': ...,
--              'data': ...
--              # The message for this particular instance
--              'message': ...,
--              'group': ...,
--          }, ...
--      ]
--  }
--
function Qless.failed(group, start, limit)
start = assert(tonumber(start or 0),
  'Failed(): Arg "start" is not a number: ' .. (start or 'nil'))
limit = assert(tonumber(limit or 25),
  'Failed(): Arg "limit" is not a number: ' .. (limit or 'nil'))

if group then
  -- If a group was provided, then we should do paginated lookup
  return {
    total = redis.call('llen', 'ql:f:' .. group),
    jobs  = redis.call('lrange', 'ql:f:' .. group, start, start + limit - 1)
  }
else
  -- Otherwise, we should just list all the known failure groups we have
  local response = {}
  local groups = redis.call('smembers', 'ql:failures')
  for index, group in ipairs(groups) do
    response[group] = redis.call('llen', 'ql:f:' .. group)
  end
  return response
end
end

-- Jobs(now, 'complete', [offset, [count]])
-- Jobs(now, (
--          'stalled' | 'running' | 'scheduled' | 'depends', 'recurring'
--      ), queue, [offset, [count]])
-------------------------------------------------------------------------------
-- Return all the job ids currently considered to be in the provided state
-- in a particular queue. The response is a list of job ids:
--
--  [
--      jid1,
--      jid2,
--      ...
--  ]
function Qless.jobs(now, state, ...)
assert(state, 'Jobs(): Arg "state" missing')
if state == 'complete' then
  local offset = assert(tonumber(arg[1] or 0),
    'Jobs(): Arg "offset" not a number: ' .. tostring(arg[1]))
  local count  = assert(tonumber(arg[2] or 25),
    'Jobs(): Arg "count" not a number: ' .. tostring(arg[2]))
  return redis.call('zrevrange', 'ql:completed', offset,
    offset + count - 1)
else
  local name  = assert(arg[1], 'Jobs(): Arg "queue" missing')
  local offset = assert(tonumber(arg[2] or 0),
    'Jobs(): Arg "offset" not a number: ' .. tostring(arg[2]))
  local count  = assert(tonumber(arg[3] or 25),
    'Jobs(): Arg "count" not a number: ' .. tostring(arg[3]))

  local queue = Qless.queue(name)
  if state == 'running' then
    return queue.locks.peek(now, offset, count)
  elseif state == 'stalled' then
    return queue.locks.expired(now, offset, count)
  elseif state == 'scheduled' then
    queue:check_scheduled(now, queue.scheduled.length())
    return queue.scheduled.peek(now, offset, count)
  elseif state == 'depends' then
    return queue.depends.peek(now, offset, count)
  elseif state == 'recurring' then
    return queue.recurring.peek('+inf', offset, count)
  else
    error('Jobs(): Unknown type "' .. state .. '"')
  end
end
end

-- Track()
-- Track(now, ('track' | 'untrack'), jid)
-- ------------------------------------------
-- If no arguments are provided, it returns details of all currently-tracked
-- jobs. If the first argument is 'track', then it will start tracking the job
-- associated with that id, and 'untrack' stops tracking it. In this context,
-- tracking is nothing more than saving the job to a list of jobs that are
-- considered special.
--
--  {
--      'jobs': [
--          {
--              'jid': ...,
--              # All the other details you'd get from 'get'
--          }, {
--              ...
--          }
--      ], 'expired': [
--          # These are all the jids that are completed and whose data expired
--          'deadbeef',
--          ...,
--          ...,
--      ]
--  }
--
function Qless.track(now, command, jid)
if command ~= nil then
  assert(jid, 'Track(): Arg "jid" missing')
  -- Verify that job exists
  assert(Qless.job(jid):exists(), 'Track(): Job does not exist')
  if string.lower(command) == 'track' then
    Qless.publish('track', jid)
    return redis.call('zadd', 'ql:tracked', now, jid)
  elseif string.lower(command) == 'untrack' then
    Qless.publish('untrack', jid)
    return redis.call('zrem', 'ql:tracked', jid)
  else
    error('Track(): Unknown action "' .. command .. '"')
  end
else
  local response = {
    jobs = {},
    expired = {}
  }
  local jids = redis.call('zrange', 'ql:tracked', 0, -1)
  for index, jid in ipairs(jids) do
    local data = Qless.job(jid):data()
    if data then
      table.insert(response.jobs, data)
    else
      table.insert(response.expired, jid)
    end
  end
  return response
end
end

-- tag(now, ('add' | 'remove'), jid, tag, [tag, ...])
-- tag(now, 'get', tag, [offset, [count]])
-- tag(now, 'top', [offset, [count]])
-- -----------------------------------------------------------------------------
-- Accepts a jid, 'add' or 'remove', and then a list of tags
-- to either add or remove from the job. Alternatively, 'get',
-- a tag to get jobs associated with that tag, and offset and
-- count
--
-- If 'add' or 'remove', the response is a list of the jobs
-- current tags, or False if the job doesn't exist. If 'get',
-- the response is of the form:
--
--  {
--      total: ...,
--      jobs: [
--          jid,
--          ...
--      ]
--  }
--
-- If 'top' is supplied, it returns the most commonly-used tags
-- in a paginated fashion.
function Qless.tag(now, command, ...)
assert(command,
  'Tag(): Arg "command" must be "add", "remove", "get" or "top"')

if command == 'add' then
  local jid  = assert(arg[1], 'Tag(): Arg "jid" missing')
  local tags = redis.call('hget', QlessJob.ns .. jid, 'tags')
  -- If the job has been canceled / deleted, then return false
  if tags then
    -- Decode the json blob, convert to dictionary
    tags = cjson.decode(tags)
    local _tags = {}
    for i,v in ipairs(tags) do _tags[v] = true end

    -- Otherwise, add the job to the sorted set with that tags
    for i=2,#arg do
      local tag = arg[i]
      if _tags[tag] == nil or _tags[tag] == false then
        _tags[tag] = true
        table.insert(tags, tag)
      end
      redis.call('zadd', 'ql:t:' .. tag, now, jid)
      redis.call('zincrby', 'ql:tags', 1, tag)
    end

    redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags))
    return tags
  else
    error('Tag(): Job ' .. jid .. ' does not exist')
  end
elseif command == 'remove' then
  local jid  = assert(arg[1], 'Tag(): Arg "jid" missing')
  local tags = redis.call('hget', QlessJob.ns .. jid, 'tags')
  -- If the job has been canceled / deleted, then return false
  if tags then
    -- Decode the json blob, convert to dictionary
    tags = cjson.decode(tags)
    local _tags = {}
    for i,v in ipairs(tags) do _tags[v] = true end

    -- Otherwise, add the job to the sorted set with that tags
    for i=2,#arg do
      local tag = arg[i]
      _tags[tag] = nil
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end

    local results = {}
    for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end

    redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results))
    return results
  else
    error('Tag(): Job ' .. jid .. ' does not exist')
  end
elseif command == 'get' then
  local tag    = assert(arg[1], 'Tag(): Arg "tag" missing')
  local offset = assert(tonumber(arg[2] or 0),
    'Tag(): Arg "offset" not a number: ' .. tostring(arg[2]))
  local count  = assert(tonumber(arg[3] or 25),
    'Tag(): Arg "count" not a number: ' .. tostring(arg[3]))
  return {
    total = redis.call('zcard', 'ql:t:' .. tag),
    jobs  = redis.call('zrange', 'ql:t:' .. tag, offset, offset + count - 1)
  }
elseif command == 'top' then
  local offset = assert(tonumber(arg[1] or 0) , 'Tag(): Arg "offset" not a number: ' .. tostring(arg[1]))
  local count  = assert(tonumber(arg[2] or 25), 'Tag(): Arg "count" not a number: ' .. tostring(arg[2]))
  return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, count)
else
  error('Tag(): First argument must be "add", "remove" or "get"')
end
end

-- Cancel(...)
-- --------------
-- Cancel a job from taking place. It will be deleted from the system, and any
-- attempts to renew a heartbeat will fail, and any attempts to complete it
-- will fail. If you try to get the data on the object, you will get nothing.
function Qless.cancel(...)
-- Dependents is a mapping of a job to its dependent jids
local dependents = {}
for _, jid in ipairs(arg) do
  dependents[jid] = redis.call(
    'smembers', QlessJob.ns .. jid .. '-dependents') or {}
end

-- Now, we'll loop through every jid we intend to cancel, and we'll go
-- make sure that this operation will be ok
for i, jid in ipairs(arg) do
  for j, dep in ipairs(dependents[jid]) do
    if dependents[dep] == nil or dependents[dep] == false then
      error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep ..
         ' but is not mentioned to be canceled')
    end
  end
end

-- If we've made it this far, then we are good to go. We can now just
-- remove any trace of all these jobs, as they form a dependent clique
for _, jid in ipairs(arg) do
  -- Find any stage it's associated with and remove its from that stage
  local state, queue, failure, worker = unpack(redis.call(
    'hmget', QlessJob.ns .. jid, 'state', 'queue', 'failure', 'worker'))

  if state ~= 'complete' then
    -- Send a message out on the appropriate channels
    local encoded = cjson.encode({
      jid    = jid,
      worker = worker,
      event  = 'canceled',
      queue  = queue
    })
    Qless.publish('log', encoded)

    -- Remove this job from whatever worker has it, if any
    if worker and (worker ~= '') then
      redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
      -- If necessary, send a message to the appropriate worker, too
      Qless.publish('w:' .. worker, encoded)
    end

    -- Remove it from that queue
    if queue then
      local queue = Qless.queue(queue)
      queue.work.remove(jid)
      queue.locks.remove(jid)
      queue.scheduled.remove(jid)
      queue.depends.remove(jid)
    end

    -- We should probably go through all our dependencies and remove
    -- ourselves from the list of dependents
    for i, j in ipairs(redis.call(
      'smembers', QlessJob.ns .. jid .. '-dependencies')) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', jid)
    end

    -- Delete any notion of dependencies it has
    redis.call('del', QlessJob.ns .. jid .. '-dependencies')

    -- If we're in the failed state, remove all of our data
    if state == 'failed' then
      failure = cjson.decode(failure)
      -- We need to make this remove it from the failed queues
      redis.call('lrem', 'ql:f:' .. failure.group, 0, jid)
      if redis.call('llen', 'ql:f:' .. failure.group) == 0 then
        redis.call('srem', 'ql:failures', failure.group)
      end
      -- Remove one count from the failed count of the particular
      -- queue
      local bin = failure.when - (failure.when % 86400)
      local failed = redis.call(
        'hget', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed')
      redis.call('hset',
        'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1)
    end

    -- Remove it as a job that's tagged with this particular tag
    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end

    -- If the job was being tracked, we should notify
    if redis.call('zscore', 'ql:tracked', jid) ~= false then
      Qless.publish('canceled', jid)
    end

    -- Just go ahead and delete our data
    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
end

return arg
end

-------------------------------------------------------------------------------
-- Configuration interactions
-------------------------------------------------------------------------------

-- This represents our default configuration settings
Qless.config.defaults = {
['application']        = 'qless',
['heartbeat']          = 60,
['grace-period']       = 10,
['stats-history']      = 30,
['histogram-history']  = 7,
['jobs-history-count'] = 50000,
['jobs-history']       = 604800
}

-- Get one or more of the keys
Qless.config.get = function(key, default)
if key then
  return redis.call('hget', 'ql:config', key) or
    Qless.config.defaults[key] or default
else
  -- Inspired by redis-lua https://github.com/nrk/redis-lua/blob/version-2.0/src/redis.lua
  local reply = redis.call('hgetall', 'ql:config')
  for i = 1, #reply, 2 do
    Qless.config.defaults[reply[i]] = reply[i + 1]
  end
  return Qless.config.defaults
end
end

-- Set a configuration variable
Qless.config.set = function(option, value)
assert(option, 'config.set(): Arg "option" missing')
assert(value , 'config.set(): Arg "value" missing')
-- Send out a log message
Qless.publish('log', cjson.encode({
  event  = 'config_set',
  option = option,
  value  = value
}))

redis.call('hset', 'ql:config', option, value)
end

-- Unset a configuration option
Qless.config.unset = function(option)
assert(option, 'config.unset(): Arg "option" missing')
-- Send out a log message
Qless.publish('log', cjson.encode({
  event  = 'config_unset',
  option = option
}))

redis.call('hdel', 'ql:config', option)
end
-------------------------------------------------------------------------------
-- Job Class
--
-- It returns an object that represents the job with the provided JID
-------------------------------------------------------------------------------

-- This gets all the data associated with the job with the provided id. If the
-- job is not found, it returns nil. If found, it returns an object with the
-- appropriate properties
function QlessJob:data(...)
local job = redis.call(
    'hmget', QlessJob.ns .. self.jid, 'jid', 'klass', 'state', 'queue',
    'worker', 'priority', 'expires', 'retries', 'remaining', 'data',
    'tags', 'failure', 'spawned_from_jid')

-- Return nil if we haven't found it
if not job[1] then
  return nil
end

local data = {
  jid              = job[1],
  klass            = job[2],
  state            = job[3],
  queue            = job[4],
  worker           = job[5] or '',
  tracked          = redis.call(
    'zscore', 'ql:tracked', self.jid) ~= false,
  priority         = tonumber(job[6]),
  expires          = tonumber(job[7]) or 0,
  retries          = tonumber(job[8]),
  remaining        = math.floor(tonumber(job[9])),
  data             = job[10],
  tags             = cjson.decode(job[11]),
  history          = self:history(),
  failure          = cjson.decode(job[12] or '{}'),
  spawned_from_jid = job[13],
  dependents       = redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependents'),
  dependencies     = redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependencies')
}

if #arg > 0 then
  -- This section could probably be optimized, but I wanted the interface
  -- in place first
  local response = {}
  for index, key in ipairs(arg) do
    table.insert(response, data[key])
  end
  return response
else
  return data
end
end

-- Complete a job and optionally put it in another queue, either scheduled or
-- to be considered waiting immediately. It can also optionally accept other
-- jids on which this job will be considered dependent before it's considered
-- valid.
--
-- The variable-length arguments may be pairs of the form:
--
--      ('next'   , queue) : The queue to advance it to next
--      ('delay'  , delay) : The delay for the next queue
--      ('depends',        : Json of jobs it depends on in the new queue
--          '["jid1", "jid2", ...]')
---
function QlessJob:complete(now, worker, queue, raw_data, ...)
assert(worker, 'Complete(): Arg "worker" missing')
assert(queue , 'Complete(): Arg "queue" missing')
local data = assert(cjson.decode(raw_data),
  'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data))

-- Read in all the optional parameters
local options = {}
for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end

-- Sanity check on optional args
local nextq   = options['next']
local delay   = assert(tonumber(options['delay'] or 0))
local depends = assert(cjson.decode(options['depends'] or '[]'),
  'Complete(): Arg "depends" not JSON: ' .. tostring(options['depends']))

-- Depends doesn't make sense without nextq
if options['delay'] and nextq == nil then
  error('Complete(): "delay" cannot be used without a "next".')
end

-- Depends doesn't make sense without nextq
if options['depends'] and nextq == nil then
  error('Complete(): "depends" cannot be used without a "next".')
end

-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)

-- First things first, we should see if the worker still owns this job
local lastworker, state, priority, retries, current_queue = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state',
    'priority', 'retries', 'queue'))

if lastworker == false then
  error('Complete(): Job ' .. self.jid .. ' does not exist')
elseif (state ~= 'running') then
  error('Complete(): Job ' .. self.jid .. ' is not currently running: ' ..
    state)
elseif lastworker ~= worker then
  error('Complete(): Job ' .. self.jid ..
    ' has been handed out to another worker: ' .. tostring(lastworker))
elseif queue ~= current_queue then
  error('Complete(): Job ' .. self.jid .. ' running in another queue: ' ..
    tostring(current_queue))
end

-- Now we can assume that the worker does own the job. We need to
--    1) Remove the job from the 'locks' from the old queue
--    2) Enqueue it in the next stage if necessary
--    3) Update the data
--    4) Mark the job as completed, remove the worker, remove expires, and
--          update history
self:history(now, 'done')

if raw_data then
  redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data)
end

-- Remove the job from the previous queue
local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
queue_obj.locks.remove(self.jid)
queue_obj.scheduled.remove(self.jid)

----------------------------------------------------------
-- This is the massive stats update that we have to do
----------------------------------------------------------
-- This is how long we've been waiting to get popped
-- local waiting = math.floor(now) - history[#history]['popped']
local time = tonumber(
  redis.call('hget', QlessJob.ns .. self.jid, 'time') or now)
local waiting = now - time
Qless.queue(queue):stat(now, 'run', waiting)
redis.call('hset', QlessJob.ns .. self.jid,
  'time', string.format("%.20f", now))

-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
  Qless.publish('completed', self.jid)
end

if nextq then
  queue_obj = Qless.queue(nextq)
  -- Send a message out to log
  Qless.publish('log', cjson.encode({
    jid   = self.jid,
    event = 'advanced',
    queue = queue,
    to    = nextq
  }))

  -- Enqueue the job
  self:history(now, 'put', {q = nextq})

  -- We're going to make sure that this queue is in the
  -- set of known queues
  if redis.call('zscore', 'ql:queues', nextq) == false then
    redis.call('zadd', 'ql:queues', now, nextq)
  end

  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'waiting',
    'worker', '',
    'failure', '{}',
    'queue', nextq,
    'expires', 0,
    'remaining', tonumber(retries))

  if (delay > 0) and (#depends == 0) then
    queue_obj.scheduled.add(now + delay, self.jid)
    return 'scheduled'
  else
    -- These are the jids we legitimately have to wait on
    local count = 0
    for i, j in ipairs(depends) do
      -- Make sure it's something other than 'nil' or complete.
      local state = redis.call('hget', QlessJob.ns .. j, 'state')
      if (state and state ~= 'complete') then
        count = count + 1
        redis.call(
          'sadd', QlessJob.ns .. j .. '-dependents',self.jid)
        redis.call(
          'sadd', QlessJob.ns .. self.jid .. '-dependencies', j)
      end
    end
    if count > 0 then
      queue_obj.depends.add(now, self.jid)
      redis.call('hset', QlessJob.ns .. self.jid, 'state', 'depends')
      if delay > 0 then
        -- We've already put it in 'depends'. Now, we must just save the data
        -- for when it's scheduled
        queue_obj.depends.add(now, self.jid)
        redis.call('hset', QlessJob.ns .. self.jid, 'scheduled', now + delay)
      end
      return 'depends'
    else
      queue_obj.work.add(now, priority, self.jid)
      return 'waiting'
    end
  end
else
  -- Send a message out to log
  Qless.publish('log', cjson.encode({
    jid   = self.jid,
    event = 'completed',
    queue = queue
  }))

  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'complete',
    'worker', '',
    'failure', '{}',
    'queue', '',
    'expires', 0,
    'remaining', tonumber(retries))

  -- Do the completion dance
  local count = Qless.config.get('jobs-history-count')
  local time  = Qless.config.get('jobs-history')

  -- These are the default values
  count = tonumber(count or 50000)
  time  = tonumber(time  or 7 * 24 * 60 * 60)

  -- Schedule this job for destructination eventually
  redis.call('zadd', 'ql:completed', now, self.jid)

  -- Now look at the expired job data. First, based on the current time
  local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time)
  -- Any jobs that need to be expired... delete
  for index, jid in ipairs(jids) do
    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end
    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
  -- And now remove those from the queued-for-cleanup queue
  redis.call('zremrangebyscore', 'ql:completed', 0, now - time)

  -- Now take the all by the most recent 'count' ids
  jids = redis.call('zrange', 'ql:completed', 0, (-1-count))
  for index, jid in ipairs(jids) do
    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end
    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
  redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count))

  -- Alright, if this has any dependents, then we should go ahead
  -- and unstick those guys.
  for i, j in ipairs(redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependents')) do
    redis.call('srem', QlessJob.ns .. j .. '-dependencies', self.jid)
    if redis.call(
      'scard', QlessJob.ns .. j .. '-dependencies') == 0 then
      local q, p, scheduled = unpack(
        redis.call('hmget', QlessJob.ns .. j, 'queue', 'priority', 'scheduled'))
      if q then
        local queue = Qless.queue(q)
        queue.depends.remove(j)
        if scheduled then
          queue.scheduled.add(scheduled, j)
          redis.call('hset', QlessJob.ns .. j, 'state', 'scheduled')
          redis.call('hdel', QlessJob.ns .. j, 'scheduled')
        else
          queue.work.add(now, p, j)
          redis.call('hset', QlessJob.ns .. j, 'state', 'waiting')
        end
      end
    end
  end

  -- Delete our dependents key
  redis.call('del', QlessJob.ns .. self.jid .. '-dependents')

  return 'complete'
end
end

-- Fail(now, worker, group, message, [data])
-- -------------------------------------------------
-- Mark the particular job as failed, with the provided group, and a more
-- specific message. By `group`, we mean some phrase that might be one of
-- several categorical modes of failure. The `message` is something more
-- job-specific, like perhaps a traceback.
--
-- This method should __not__ be used to note that a job has been dropped or
-- has failed in a transient way. This method __should__ be used to note that
-- a job has something really wrong with it that must be remedied.
--
-- The motivation behind the `group` is so that similar errors can be grouped
-- together. Optionally, updated data can be provided for the job. A job in
-- any state can be marked as failed. If it has been given to a worker as a
-- job, then its subsequent requests to heartbeat or complete that job will
-- fail. Failed jobs are kept until they are canceled or completed.
--
-- __Returns__ the id of the failed job if successful, or `False` on failure.
--
-- Args:
--    1) jid
--    2) worker
--    3) group
--    4) message
--    5) the current time
--    6) [data]
function QlessJob:fail(now, worker, group, message, data)
local worker  = assert(worker           , 'Fail(): Arg "worker" missing')
local group   = assert(group            , 'Fail(): Arg "group" missing')
local message = assert(message          , 'Fail(): Arg "message" missing')

-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)

if data then
  data = cjson.decode(data)
end

-- First things first, we should get the history
local queue, state, oldworker = unpack(redis.call(
  'hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))

-- If the job has been completed, we cannot fail it
if not state then
  error('Fail(): Job ' .. self.jid .. 'does not exist')
elseif state ~= 'running' then
  error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state)
elseif worker ~= oldworker then
  error('Fail(): Job ' .. self.jid .. ' running with another worker: ' ..
    oldworker)
end

-- Send out a log message
Qless.publish('log', cjson.encode({
  jid     = self.jid,
  event   = 'failed',
  worker  = worker,
  group   = group,
  message = message
}))

if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
  Qless.publish('failed', self.jid)
end

-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

-- Now, take the element of the history for which our provided worker is
-- the worker, and update 'failed'
self:history(now, 'failed', {worker = worker, group = group})

-- Increment the number of failures for that queue for the
-- given day.
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , 1)

-- Now remove the instance from the schedule, and work queues for the
-- queue it's in
local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
queue_obj.locks.remove(self.jid)
queue_obj.scheduled.remove(self.jid)

-- The reason that this appears here is that the above will fail if the
-- job doesn't exist
if data then
  redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data))
end

redis.call('hmset', QlessJob.ns .. self.jid,
  'state', 'failed',
  'worker', '',
  'expires', '',
  'failure', cjson.encode({
    ['group']   = group,
    ['message'] = message,
    ['when']    = math.floor(now),
    ['worker']  = worker
  }))

-- Add this group of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed groups
redis.call('lpush', 'ql:f:' .. group, self.jid)

-- Here is where we'd intcrement stats about the particular stage
-- and possibly the workers

return self.jid
end

-- retry(now, queue, worker, [delay, [group, [message]]])
-- ------------------------------------------
-- This script accepts jid, queue, worker and delay for retrying a job. This
-- is similar in functionality to `put`, except that this counts against the
-- retries a job has for a stage.
--
-- Throws an exception if:
--      - the worker is not the worker with a lock on the job
--      - the job is not actually running
--
-- Otherwise, it returns the number of retries remaining. If the allowed
-- retries have been exhausted, then it is automatically failed, and a negative
-- number is returned.
--
-- If a group and message is provided, then if the retries are exhausted, then
-- the provided group and message will be used in place of the default
-- messaging about retries in the particular queue being exhausted
function QlessJob:retry(now, queue, worker, delay, group, message)
assert(queue , 'Retry(): Arg "queue" missing')
assert(worker, 'Retry(): Arg "worker" missing')
delay = assert(tonumber(delay or 0),
  'Retry(): Arg "delay" not a number: ' .. tostring(delay))

-- Let's see what the old priority, and tags were
local oldqueue, state, retries, oldworker, priority, failure = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state',
    'retries', 'worker', 'priority', 'failure'))

-- If this isn't the worker that owns
if oldworker == false then
  error('Retry(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error('Retry(): Job ' .. self.jid .. ' is not currently running: ' ..
    state)
elseif oldworker ~= worker then
  error('Retry(): Job ' .. self.jid ..
    ' has been given to another worker: ' .. oldworker)
end

-- For each of these, decrement their retries. If any of them
-- have exhausted their retries, then we should mark them as
-- failed.
local remaining = tonumber(redis.call(
  'hincrby', QlessJob.ns .. self.jid, 'remaining', -1))
redis.call('hdel', QlessJob.ns .. self.jid, 'grace')

-- Remove it from the locks key of the old queue
Qless.queue(oldqueue).locks.remove(self.jid)

-- Remove this job from the worker that was previously working it
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

if remaining < 0 then
  -- Now remove the instance from the schedule, and work queues for the
  -- queue it's in
  local group = group or 'failed-retries-' .. queue
  self:history(now, 'failed', {['group'] = group})

  redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed',
    'worker', '',
    'expires', '')
  -- If the failure has not already been set, then set it
  if group ~= nil and message ~= nil then
    redis.call('hset', QlessJob.ns .. self.jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] = message,
        ['when']    = math.floor(now),
        ['worker']  = worker
      })
    )
  else
    redis.call('hset', QlessJob.ns .. self.jid,
    'failure', cjson.encode({
      ['group']   = group,
      ['message'] =
        'Job exhausted retries in queue "' .. oldqueue .. '"',
      ['when']    = now,
      ['worker']  = unpack(self:data('worker'))
    }))
  end

  if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
    Qless.publish('failed', self.jid)
  end

  -- Add this type of failure to the list of failures
  redis.call('sadd', 'ql:failures', group)
  -- And add this particular instance to the failed types
  redis.call('lpush', 'ql:f:' .. group, self.jid)
  -- Increment the count of the failed jobs
  local bin = now - (now % 86400)
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , 1)
else
  -- Put it in the queue again with a delay. Like put()
  local queue_obj = Qless.queue(queue)
  if delay > 0 then
    queue_obj.scheduled.add(now + delay, self.jid)
    redis.call('hset', QlessJob.ns .. self.jid, 'state', 'scheduled')
  else
    queue_obj.work.add(now, priority, self.jid)
    redis.call('hset', QlessJob.ns .. self.jid, 'state', 'waiting')
  end

  -- If a group and a message was provided, then we should save it
  if group ~= nil and message ~= nil then
    redis.call('hset', QlessJob.ns .. self.jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] = message,
        ['when']    = math.floor(now),
        ['worker']  = worker
      })
    )
  end
end

return math.floor(remaining)
end

-- Depends(jid, 'on', [jid, [jid, [...]]]
-- Depends(jid, 'off', [jid, [jid, [...]]])
-- Depends(jid, 'off', 'all')
-------------------------------------------------------------------------------
-- Add or remove dependencies a job has. If 'on' is provided, the provided
-- jids are added as dependencies. If 'off' and 'all' are provided, then all
-- the current dependencies are removed. If 'off' is provided and the next
-- argument is not 'all', then those jids are removed as dependencies.
--
-- If a job is not already in the 'depends' state, then this call will return
-- false. Otherwise, it will return true
function QlessJob:depends(now, command, ...)
assert(command, 'Depends(): Arg "command" missing')
local state = redis.call('hget', QlessJob.ns .. self.jid, 'state')
if state ~= 'depends' then
  error('Depends(): Job ' .. self.jid ..
    ' not in the depends state: ' .. tostring(state))
end

if command == 'on' then
  -- These are the jids we legitimately have to wait on
  for i, j in ipairs(arg) do
    -- Make sure it's something other than 'nil' or complete.
    local state = redis.call('hget', QlessJob.ns .. j, 'state')
    if (state and state ~= 'complete') then
      redis.call(
        'sadd', QlessJob.ns .. j .. '-dependents'  , self.jid)
      redis.call(
        'sadd', QlessJob.ns .. self.jid .. '-dependencies', j)
    end
  end
  return true
elseif command == 'off' then
  if arg[1] == 'all' then
    for i, j in ipairs(redis.call(
      'smembers', QlessJob.ns .. self.jid .. '-dependencies')) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', self.jid)
    end
    redis.call('del', QlessJob.ns .. self.jid .. '-dependencies')
    local q, p = unpack(redis.call(
      'hmget', QlessJob.ns .. self.jid, 'queue', 'priority'))
    if q then
      local queue_obj = Qless.queue(q)
      queue_obj.depends.remove(self.jid)
      queue_obj.work.add(now, p, self.jid)
      redis.call('hset', QlessJob.ns .. self.jid, 'state', 'waiting')
    end
  else
    for i, j in ipairs(arg) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', self.jid)
      redis.call(
        'srem', QlessJob.ns .. self.jid .. '-dependencies', j)
      if redis.call('scard',
        QlessJob.ns .. self.jid .. '-dependencies') == 0 then
        local q, p = unpack(redis.call(
          'hmget', QlessJob.ns .. self.jid, 'queue', 'priority'))
        if q then
          local queue_obj = Qless.queue(q)
          queue_obj.depends.remove(self.jid)
          queue_obj.work.add(now, p, self.jid)
          redis.call('hset',
            QlessJob.ns .. self.jid, 'state', 'waiting')
        end
      end
    end
  end
  return true
else
  error('Depends(): Argument "command" must be "on" or "off"')
end
end

-- Heartbeat
------------
-- Renew this worker's lock on this job. Throws an exception if:
--      - the job's been given to another worker
--      - the job's been completed
--      - the job's been canceled
--      - the job's not running
function QlessJob:heartbeat(now, worker, data)
assert(worker, 'Heatbeat(): Arg "worker" missing')

-- We should find the heartbeat interval for this queue
-- heartbeat. First, though, we need to find the queue
-- this particular job is in
local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue') or ''
local expires = now + tonumber(
  Qless.config.get(queue .. '-heartbeat') or
  Qless.config.get('heartbeat', 60))

if data then
  data = cjson.decode(data)
end

-- First, let's see if the worker still owns this job, and there is a
-- worker
local job_worker, state = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state'))
if job_worker == false then
  -- This means the job doesn't exist
  error('Heartbeat(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error(
    'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state)
elseif job_worker ~= worker or #job_worker == 0 then
  error(
    'Heartbeat(): Job ' .. self.jid ..
    ' given out to another worker: ' .. job_worker)
else
  -- Otherwise, optionally update the user data, and the heartbeat
  if data then
    -- I don't know if this is wise, but I'm decoding and encoding
    -- the user data to hopefully ensure its sanity
    redis.call('hmset', QlessJob.ns .. self.jid, 'expires',
      expires, 'worker', worker, 'data', cjson.encode(data))
  else
    redis.call('hmset', QlessJob.ns .. self.jid,
      'expires', expires, 'worker', worker)
  end

  -- Update hwen this job was last updated on that worker
  -- Add this job to the list of jobs handled by this worker
  redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid)

  -- Make sure we this worker to the list of seen workers
  redis.call('zadd', 'ql:workers', now, worker)

  -- And now we should just update the locks
  local queue = Qless.queue(
    redis.call('hget', QlessJob.ns .. self.jid, 'queue'))
  queue.locks.add(expires, self.jid)
  return expires
end
end

-- Priority
-- --------
-- Update the priority of this job. If the job doesn't exist, throws an
-- exception
function QlessJob:priority(priority)
priority = assert(tonumber(priority),
  'Priority(): Arg "priority" missing or not a number: ' ..
  tostring(priority))

-- Get the queue the job is currently in, if any
local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue')

if queue == nil or queue == false then
  -- If the job doesn't exist, throw an error
  error('Priority(): Job ' .. self.jid .. ' does not exist')
elseif queue == '' then
  -- Just adjust the priority
  redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority)
  return priority
else
  -- Adjust the priority and see if it's a candidate for updating
  -- its priority in the queue it's currently in
  local queue_obj = Qless.queue(queue)
  if queue_obj.work.score(self.jid) then
    queue_obj.work.add(0, priority, self.jid)
  end
  redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority)
  return priority
end
end

-- Update the jobs' attributes with the provided dictionary
function QlessJob:update(data)
local tmp = {}
for k, v in pairs(data) do
  table.insert(tmp, k)
  table.insert(tmp, v)
end
redis.call('hmset', QlessJob.ns .. self.jid, unpack(tmp))
end

-- Times out the job now rather than when its lock is normally set to expire
function QlessJob:timeout(now)
local queue_name, state, worker = unpack(redis.call('hmget',
  QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))
if queue_name == nil or queue_name == false then
  error('Timeout(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error('Timeout(): Job ' .. self.jid .. ' not running')
else
  -- Time out the job
  self:history(now, 'timed-out')
  local queue = Qless.queue(queue_name)
  queue.locks.remove(self.jid)
  queue.work.add(now, '+inf', self.jid)
  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'stalled', 'expires', 0)
  local encoded = cjson.encode({
    jid    = self.jid,
    event  = 'lock_lost',
    worker = worker
  })
  Qless.publish('w:' .. worker, encoded)
  Qless.publish('log', encoded)
  return queue_name
end
end

-- Return whether or not this job exists
function QlessJob:exists()
return redis.call('exists', QlessJob.ns .. self.jid) == 1
end

-- Get or append to history
function QlessJob:history(now, what, item)
-- First, check if there's an old-style history, and update it if there is
local history = redis.call('hget', QlessJob.ns .. self.jid, 'history')
if history then
  history = cjson.decode(history)
  for i, value in ipairs(history) do
    redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
      cjson.encode({math.floor(value.put), 'put', {q = value.q}}))

    -- If there's any popped time
    if value.popped then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode({math.floor(value.popped), 'popped',
          {worker = value.worker}}))
    end

    -- If there's any failure
    if value.failed then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode(
          {math.floor(value.failed), 'failed', nil}))
    end

    -- If it was completed
    if value.done then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode(
          {math.floor(value.done), 'done', nil}))
    end
  end
  -- With all this ported forward, delete the old-style history
  redis.call('hdel', QlessJob.ns .. self.jid, 'history')
end

-- Now to the meat of the function
if what == nil then
  -- Get the history
  local response = {}
  for i, value in ipairs(redis.call('lrange',
    QlessJob.ns .. self.jid .. '-history', 0, -1)) do
    value = cjson.decode(value)
    local dict = value[3] or {}
    dict['when'] = value[1]
    dict['what'] = value[2]
    table.insert(response, dict)
  end
  return response
else
  -- Append to the history. If the length of the history should be limited,
  -- then we'll truncate it.
  local count = tonumber(Qless.config.get('max-job-history', 100))
  if count > 0 then
    -- We'll always keep the first item around
    local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history')
    redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1)
    if obj ~= nil and obj ~= false then
      redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj)
    end
  end
  return redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
    cjson.encode({math.floor(now), what, item}))
end
end
-------------------------------------------------------------------------------
-- Queue class
-------------------------------------------------------------------------------
-- Return a queue object
function Qless.queue(name)
assert(name, 'Queue(): no queue name provided')
local queue = {}
setmetatable(queue, QlessQueue)
queue.name = name

-- Access to our work
queue.work = {
  peek = function(count)
    if count == 0 then
      return {}
    end
    local jids = {}
    for index, jid in ipairs(redis.call(
      'zrevrange', queue:prefix('work'), 0, count - 1)) do
      table.insert(jids, jid)
    end
    return jids
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('work'), unpack(arg))
    end
  end, add = function(now, priority, jid)
    if priority ~= '+inf' then
      priority = priority - (now / 10000000000)
    end
    return redis.call('zadd',
      queue:prefix('work'), priority, jid)
  end, score = function(jid)
    return redis.call('zscore', queue:prefix('work'), jid)
  end, length = function()
    return redis.call('zcard', queue:prefix('work'))
  end
}

-- Access to our locks
queue.locks = {
  expired = function(now, offset, count)
    return redis.call('zrangebyscore',
      queue:prefix('locks'), '-inf', now, 'LIMIT', offset, count)
  end, peek = function(now, offset, count)
    return redis.call('zrangebyscore', queue:prefix('locks'),
      now, '+inf', 'LIMIT', offset, count)
  end, add = function(expires, jid)
    redis.call('zadd', queue:prefix('locks'), expires, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('locks'), unpack(arg))
    end
  end, running = function(now)
    return redis.call('zcount', queue:prefix('locks'), now, '+inf')
  end, length = function(now)
    -- If a 'now' is provided, we're interested in how many are before
    -- that time
    if now then
      return redis.call('zcount', queue:prefix('locks'), 0, now)
    else
      return redis.call('zcard', queue:prefix('locks'))
    end
  end
}

-- Access to our dependent jobs
queue.depends = {
  peek = function(now, offset, count)
    return redis.call('zrange',
      queue:prefix('depends'), offset, offset + count - 1)
  end, add = function(now, jid)
    redis.call('zadd', queue:prefix('depends'), now, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('depends'), unpack(arg))
    end
  end, length = function()
    return redis.call('zcard', queue:prefix('depends'))
  end
}

-- Access to our scheduled jobs
queue.scheduled = {
  peek = function(now, offset, count)
    return redis.call('zrange',
      queue:prefix('scheduled'), offset, offset + count - 1)
  end, ready = function(now, offset, count)
    return redis.call('zrangebyscore',
      queue:prefix('scheduled'), 0, now, 'LIMIT', offset, count)
  end, add = function(when, jid)
    redis.call('zadd', queue:prefix('scheduled'), when, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('scheduled'), unpack(arg))
    end
  end, length = function()
    return redis.call('zcard', queue:prefix('scheduled'))
  end
}

-- Access to our recurring jobs
queue.recurring = {
  peek = function(now, offset, count)
    return redis.call('zrangebyscore', queue:prefix('recur'),
      0, now, 'LIMIT', offset, count)
  end, ready = function(now, offset, count)
  end, add = function(when, jid)
    redis.call('zadd', queue:prefix('recur'), when, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('recur'), unpack(arg))
    end
  end, update = function(increment, jid)
    redis.call('zincrby', queue:prefix('recur'), increment, jid)
  end, score = function(jid)
    return redis.call('zscore', queue:prefix('recur'), jid)
  end, length = function()
    return redis.call('zcard', queue:prefix('recur'))
  end
}
return queue
end

-- Return the prefix for this particular queue
function QlessQueue:prefix(group)
if group then
  return QlessQueue.ns..self.name..'-'..group
else
  return QlessQueue.ns..self.name
end
end

-- Stats(now, date)
-- ---------------------
-- Return the current statistics for a given queue on a given date. The
-- results are returned are a JSON blob:
--
--
--  {
--      # These are unimplemented as of yet
--      'failed': 3,
--      'retries': 5,
--      'wait' : {
--          'total'    : ...,
--          'mean'     : ...,
--          'variance' : ...,
--          'histogram': [
--              ...
--          ]
--      }, 'run': {
--          'total'    : ...,
--          'mean'     : ...,
--          'variance' : ...,
--          'histogram': [
--              ...
--          ]
--      }
--  }
--
-- The histogram's data points are at the second resolution for the first
-- minute, the minute resolution for the first hour, the 15-minute resolution
-- for the first day, the hour resolution for the first 3 days, and then at
-- the day resolution from there on out. The `histogram` key is a list of
-- those values.
function QlessQueue:stats(now, date)
date = assert(tonumber(date),
  'Stats(): Arg "date" missing or not a number: '.. (date or 'nil'))

-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = date - (date % 86400)

-- This a table of all the keys we want to use in order to produce a histogram
local histokeys = {
  's0','s1','s2','s3','s4','s5','s6','s7','s8','s9','s10','s11','s12','s13','s14','s15','s16','s17','s18','s19','s20','s21','s22','s23','s24','s25','s26','s27','s28','s29','s30','s31','s32','s33','s34','s35','s36','s37','s38','s39','s40','s41','s42','s43','s44','s45','s46','s47','s48','s49','s50','s51','s52','s53','s54','s55','s56','s57','s58','s59',
  'm1','m2','m3','m4','m5','m6','m7','m8','m9','m10','m11','m12','m13','m14','m15','m16','m17','m18','m19','m20','m21','m22','m23','m24','m25','m26','m27','m28','m29','m30','m31','m32','m33','m34','m35','m36','m37','m38','m39','m40','m41','m42','m43','m44','m45','m46','m47','m48','m49','m50','m51','m52','m53','m54','m55','m56','m57','m58','m59',
  'h1','h2','h3','h4','h5','h6','h7','h8','h9','h10','h11','h12','h13','h14','h15','h16','h17','h18','h19','h20','h21','h22','h23',
  'd1','d2','d3','d4','d5','d6'
}

local mkstats = function(name, bin, queue)
  -- The results we'll be sending back
  local results = {}

  local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue
  local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk'))

  count = tonumber(count) or 0
  mean  = tonumber(mean) or 0
  vk    = tonumber(vk)

  results.count     = count or 0
  results.mean      = mean  or 0
  results.histogram = {}

  if not count then
    results.std = 0
  else
    if count > 1 then
      results.std = math.sqrt(vk / (count - 1))
    else
      results.std = 0
    end
  end

  local histogram = redis.call('hmget', key, unpack(histokeys))
  for i=1,#histokeys do
    table.insert(results.histogram, tonumber(histogram[i]) or 0)
  end
  return results
end

local retries, failed, failures = unpack(redis.call('hmget', 'ql:s:stats:' .. bin .. ':' .. self.name, 'retries', 'failed', 'failures'))
return {
  retries  = tonumber(retries  or 0),
  failed   = tonumber(failed   or 0),
  failures = tonumber(failures or 0),
  wait     = mkstats('wait', bin, self.name),
  run      = mkstats('run' , bin, self.name)
}
end

-- Peek
-------
-- Examine the next jobs that would be popped from the queue without actually
-- popping them.
function QlessQueue:peek(now, count)
count = assert(tonumber(count),
  'Peek(): Arg "count" missing or not a number: ' .. tostring(count))

-- These are the ids that we're going to return. We'll begin with any jobs
-- that have lost their locks
local jids = self.locks.expired(now, 0, count)

-- If we still need jobs in order to meet demand, then we should
-- look for all the recurring jobs that need jobs run
self:check_recurring(now, count - #jids)

-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested. If
-- we still need values in order to meet the demand, then we
-- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
self:check_scheduled(now, count - #jids)

-- With these in place, we can expand this list of jids based on the work
-- queue itself and the priorities therein
tbl_extend(jids, self.work.peek(count - #jids))

return jids
end

-- Return true if this queue is paused
function QlessQueue:paused()
return redis.call('sismember', 'ql:paused_queues', self.name) == 1
end

-- Pause this queue
--
-- Note: long term, we have discussed adding a rate-limiting
-- feature to qless-core, which would be more flexible and
-- could be used for pausing (i.e. pause = set the rate to 0).
-- For now, this is far simpler, but we should rewrite this
-- in terms of the rate limiting feature if/when that is added.
function QlessQueue.pause(now, ...)
redis.call('sadd', 'ql:paused_queues', unpack(arg))
end

-- Unpause this queue
function QlessQueue.unpause(...)
redis.call('srem', 'ql:paused_queues', unpack(arg))
end

-- Checks for expired locks, scheduled and recurring jobs, returning any
-- jobs that are ready to be processes
function QlessQueue:pop(now, worker, count)
assert(worker, 'Pop(): Arg "worker" missing')
count = assert(tonumber(count),
  'Pop(): Arg "count" missing or not a number: ' .. tostring(count))

-- We should find the heartbeat interval for this queue heartbeat
local expires = now + tonumber(
  Qless.config.get(self.name .. '-heartbeat') or
  Qless.config.get('heartbeat', 60))

-- If this queue is paused, then return no jobs
if self:paused() then
  return {}
end

-- Make sure we this worker to the list of seen workers
redis.call('zadd', 'ql:workers', now, worker)

-- Check our max concurrency, and limit the count
local max_concurrency = tonumber(
  Qless.config.get(self.name .. '-max-concurrency', 0))

if max_concurrency > 0 then
  -- Allow at most max_concurrency - #running
  local allowed = math.max(0, max_concurrency - self.locks.running(now))
  count = math.min(allowed, count)
  if count == 0 then
    return {}
  end
end

local jids = self:invalidate_locks(now, count)
-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested.

-- If we still need jobs in order to meet demand, then we should
-- look for all the recurring jobs that need jobs run
self:check_recurring(now, count - #jids)

-- If we still need values in order to meet the demand, then we
-- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
self:check_scheduled(now, count - #jids)

-- With these in place, we can expand this list of jids based on the work
-- queue itself and the priorities therein
tbl_extend(jids, self.work.peek(count - #jids))

local state
for index, jid in ipairs(jids) do
  local job = Qless.job(jid)
  state = unpack(job:data('state'))
  job:history(now, 'popped', {worker = worker})

  -- Update the wait time statistics
  local time = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'time') or now)
  local waiting = now - time
  self:stat(now, 'wait', waiting)
  redis.call('hset', QlessJob.ns .. jid,
    'time', string.format("%.20f", now))

  -- Add this job to the list of jobs handled by this worker
  redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)

  -- Update the jobs data, and add its locks, and return the job
  job:update({
    worker  = worker,
    expires = expires,
    state   = 'running'
  })

  self.locks.add(expires, jid)

  local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false
  if tracked then
    Qless.publish('popped', jid)
  end
end

-- If we are returning any jobs, then we should remove them from the work
-- queue
self.work.remove(unpack(jids))

return jids
end

-- Update the stats for this queue
function QlessQueue:stat(now, stat, val)
-- The bin is midnight of the provided day
local bin = now - (now % 86400)
local key = 'ql:s:' .. stat .. ':' .. bin .. ':' .. self.name

-- Get the current data
local count, mean, vk = unpack(
  redis.call('hmget', key, 'total', 'mean', 'vk'))

-- If there isn't any data there presently, then we must initialize it
count = count or 0
if count == 0 then
  mean  = val
  vk    = 0
  count = 1
else
  count = count + 1
  local oldmean = mean
  mean  = mean + (val - mean) / count
  vk    = vk + (val - mean) * (val - oldmean)
end

-- Now, update the histogram
-- - `s1`, `s2`, ..., -- second-resolution histogram counts
-- - `m1`, `m2`, ..., -- minute-resolution
-- - `h1`, `h2`, ..., -- hour-resolution
-- - `d1`, `d2`, ..., -- day-resolution
val = math.floor(val)
if val < 60 then -- seconds
  redis.call('hincrby', key, 's' .. val, 1)
elseif val < 3600 then -- minutes
  redis.call('hincrby', key, 'm' .. math.floor(val / 60), 1)
elseif val < 86400 then -- hours
  redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1)
else -- days
  redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1)
end
redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk)
end

-- Put(now, jid, klass, data, delay,
--     [priority, p],
--     [tags, t],
--     [retries, r],
--     [depends, '[...]'])
-- -----------------------
-- Insert a job into the queue with the given priority, tags, delay, klass and
-- data.
function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
assert(jid  , 'Put(): Arg "jid" missing')
assert(klass, 'Put(): Arg "klass" missing')
local data = assert(cjson.decode(raw_data),
  'Put(): Arg "data" missing or not JSON: ' .. tostring(raw_data))
delay = assert(tonumber(delay),
  'Put(): Arg "delay" not a number: ' .. tostring(delay))

-- Read in all the optional parameters. All of these must come in pairs, so
-- if we have an odd number of extra args, raise an error
if #arg % 2 == 1 then
  error('Odd number of additional args: ' .. tostring(arg))
end
local options = {}
for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end

-- Let's see what the old priority and tags were
local job = Qless.job(jid)
local priority, tags, oldqueue, state, failure, retries, oldworker =
  unpack(redis.call('hmget', QlessJob.ns .. jid, 'priority', 'tags',
    'queue', 'state', 'failure', 'retries', 'worker'))

-- If there are old tags, then we should remove the tags this job has
if tags then
  Qless.tag(now, 'remove', jid, unpack(cjson.decode(tags)))
end

-- Sanity check on optional args
retries  = assert(tonumber(options['retries']  or retries or 5) ,
  'Put(): Arg "retries" not a number: ' .. tostring(options['retries']))
tags     = assert(cjson.decode(options['tags'] or tags or '[]' ),
  'Put(): Arg "tags" not JSON'          .. tostring(options['tags']))
priority = assert(tonumber(options['priority'] or priority or 0),
  'Put(): Arg "priority" not a number'  .. tostring(options['priority']))
local depends = assert(cjson.decode(options['depends'] or '[]') ,
  'Put(): Arg "depends" not JSON: '     .. tostring(options['depends']))

-- If the job has old dependencies, determine which dependencies are
-- in the new dependencies but not in the old ones, and which are in the
-- old ones but not in the new
if #depends > 0 then
  -- This makes it easier to check if it's in the new list
  local new = {}
  for _, d in ipairs(depends) do new[d] = 1 end

  -- Now find what's in the original, but not the new
  local original = redis.call(
    'smembers', QlessJob.ns .. jid .. '-dependencies')
  for _, dep in pairs(original) do
    if new[dep] == nil or new[dep] == false then
      -- Remove k as a dependency
      redis.call('srem', QlessJob.ns .. dep .. '-dependents'  , jid)
      redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep)
    end
  end
end

-- Send out a log message
Qless.publish('log', cjson.encode({
  jid   = jid,
  event = 'put',
  queue = self.name
}))

-- Update the history to include this new change
job:history(now, 'put', {q = self.name})

-- If this item was previously in another queue, then we should remove it from there
if oldqueue then
  local queue_obj = Qless.queue(oldqueue)
  queue_obj.work.remove(jid)
  queue_obj.locks.remove(jid)
  queue_obj.depends.remove(jid)
  queue_obj.scheduled.remove(jid)
end

-- If this had previously been given out to a worker, make sure to remove it
-- from that worker's jobs
if oldworker and oldworker ~= '' then
  redis.call('zrem', 'ql:w:' .. oldworker .. ':jobs', jid)
  -- If it's a different worker that's putting this job, send a notification
  -- to the last owner of the job
  if oldworker ~= worker then
    -- We need to inform whatever worker had that job
    local encoded = cjson.encode({
      jid    = jid,
      event  = 'lock_lost',
      worker = oldworker
    })
    Qless.publish('w:' .. oldworker, encoded)
    Qless.publish('log', encoded)
  end
end

-- If the job was previously in the 'completed' state, then we should
-- remove it from being enqueued for destructination
if state == 'complete' then
  redis.call('zrem', 'ql:completed', jid)
end

-- Add this job to the list of jobs tagged with whatever tags were supplied
for i, tag in ipairs(tags) do
  redis.call('zadd', 'ql:t:' .. tag, now, jid)
  redis.call('zincrby', 'ql:tags', 1, tag)
end

-- If we're in the failed state, remove all of our data
if state == 'failed' then
  failure = cjson.decode(failure)
  -- We need to make this remove it from the failed queues
  redis.call('lrem', 'ql:f:' .. failure.group, 0, jid)
  if redis.call('llen', 'ql:f:' .. failure.group) == 0 then
    redis.call('srem', 'ql:failures', failure.group)
  end
  -- The bin is midnight of the provided day
  -- 24 * 60 * 60 = 86400
  local bin = failure.when - (failure.when % 86400)
  -- We also need to decrement the stats about the queue on
  -- the day that this failure actually happened.
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed'  , -1)
end

-- First, let's save its data
redis.call('hmset', QlessJob.ns .. jid,
  'jid'      , jid,
  'klass'    , klass,
  'data'     , raw_data,
  'priority' , priority,
  'tags'     , cjson.encode(tags),
  'state'    , ((delay > 0) and 'scheduled') or 'waiting',
  'worker'   , '',
  'expires'  , 0,
  'queue'    , self.name,
  'retries'  , retries,
  'remaining', retries,
  'time'     , string.format("%.20f", now))

-- These are the jids we legitimately have to wait on
for i, j in ipairs(depends) do
  -- Make sure it's something other than 'nil' or complete.
  local state = redis.call('hget', QlessJob.ns .. j, 'state')
  if (state and state ~= 'complete') then
    redis.call('sadd', QlessJob.ns .. j .. '-dependents'  , jid)
    redis.call('sadd', QlessJob.ns .. jid .. '-dependencies', j)
  end
end

-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
if delay > 0 then
  if redis.call('scard', QlessJob.ns .. jid .. '-dependencies') > 0 then
    -- We've already put it in 'depends'. Now, we must just save the data
    -- for when it's scheduled
    self.depends.add(now, jid)
    redis.call('hmset', QlessJob.ns .. jid,
      'state', 'depends',
      'scheduled', now + delay)
  else
    self.scheduled.add(now + delay, jid)
  end
else
  if redis.call('scard', QlessJob.ns .. jid .. '-dependencies') > 0 then
    self.depends.add(now, jid)
    redis.call('hset', QlessJob.ns .. jid, 'state', 'depends')
  else
    self.work.add(now, priority, jid)
  end
end

-- Lastly, we're going to make sure that this item is in the
-- set of known queues. We should keep this sorted by the
-- order in which we saw each of these queues
if redis.call('zscore', 'ql:queues', self.name) == false then
  redis.call('zadd', 'ql:queues', now, self.name)
end

if redis.call('zscore', 'ql:tracked', jid) ~= false then
  Qless.publish('put', jid)
end

return jid
end

-- Move `count` jobs out of the failed state and into this queue
function QlessQueue:unfail(now, group, count)
assert(group, 'Unfail(): Arg "group" missing')
count = assert(tonumber(count or 25),
  'Unfail(): Arg "count" not a number: ' .. tostring(count))

-- Get up to that many jobs, and we'll put them in the appropriate queue
local jids = redis.call('lrange', 'ql:f:' .. group, -count, -1)

-- And now set each job's state, and put it into the appropriate queue
local toinsert = {}
for index, jid in ipairs(jids) do
  local job = Qless.job(jid)
  local data = job:data()
  job:history(now, 'put', {q = self.name})
  redis.call('hmset', QlessJob.ns .. data.jid,
    'state'    , 'waiting',
    'worker'   , '',
    'expires'  , 0,
    'queue'    , self.name,
    'remaining', data.retries or 5)
  self.work.add(now, data.priority, data.jid)
end

-- Remove these jobs from the failed state
redis.call('ltrim', 'ql:f:' .. group, 0, -count - 1)
if (redis.call('llen', 'ql:f:' .. group) == 0) then
  redis.call('srem', 'ql:failures', group)
end

return #jids
end

-- Recur a job of type klass in this queue
function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
assert(jid  , 'RecurringJob On(): Arg "jid" missing')
assert(klass, 'RecurringJob On(): Arg "klass" missing')
assert(spec , 'RecurringJob On(): Arg "spec" missing')
local data = assert(cjson.decode(raw_data),
  'RecurringJob On(): Arg "data" not JSON: ' .. tostring(raw_data))

-- At some point in the future, we may have different types of recurring
-- jobs, but for the time being, we only have 'interval'-type jobs
if spec == 'interval' then
  local interval = assert(tonumber(arg[1]),
    'Recur(): Arg "interval" not a number: ' .. tostring(arg[1]))
  local offset   = assert(tonumber(arg[2]),
    'Recur(): Arg "offset" not a number: '   .. tostring(arg[2]))
  if interval <= 0 then
    error('Recur(): Arg "interval" must be greater than 0')
  end

  -- Read in all the optional parameters. All of these must come in
  -- pairs, so if we have an odd number of extra args, raise an error
  if #arg % 2 == 1 then
    error('Odd number of additional args: ' .. tostring(arg))
  end

  -- Read in all the optional parameters
  local options = {}
  for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end
  options.tags = assert(cjson.decode(options.tags or '{}'),
    'Recur(): Arg "tags" must be JSON string array: ' .. tostring(
      options.tags))
  options.priority = assert(tonumber(options.priority or 0),
    'Recur(): Arg "priority" not a number: ' .. tostring(
      options.priority))
  options.retries = assert(tonumber(options.retries  or 0),
    'Recur(): Arg "retries" not a number: ' .. tostring(
      options.retries))
  options.backlog = assert(tonumber(options.backlog  or 0),
    'Recur(): Arg "backlog" not a number: ' .. tostring(
      options.backlog))

  local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue'))
  count = count or 0

  -- If it has previously been in another queue, then we should remove
  -- some information about it
  if old_queue then
    Qless.queue(old_queue).recurring.remove(jid)
  end

  -- Do some insertions
  redis.call('hmset', 'ql:r:' .. jid,
    'jid'     , jid,
    'klass'   , klass,
    'data'    , raw_data,
    'priority', options.priority,
    'tags'    , cjson.encode(options.tags or {}),
    'state'   , 'recur',
    'queue'   , self.name,
    'type'    , 'interval',
    -- How many jobs we've spawned from this
    'count'   , count,
    'interval', interval,
    'retries' , options.retries,
    'backlog' , options.backlog)
  -- Now, we should schedule the next run of the job
  self.recurring.add(now + offset, jid)

  -- Lastly, we're going to make sure that this item is in the
  -- set of known queues. We should keep this sorted by the
  -- order in which we saw each of these queues
  if redis.call('zscore', 'ql:queues', self.name) == false then
    redis.call('zadd', 'ql:queues', now, self.name)
  end

  return jid
else
  error('Recur(): schedule type "' .. tostring(spec) .. '" unknown')
end
end

-- Return the length of the queue
function QlessQueue:length()
return  self.locks.length() + self.work.length() + self.scheduled.length()
end

-------------------------------------------------------------------------------
-- Housekeeping methods
-------------------------------------------------------------------------------
-- Instantiate any recurring jobs that are ready
function QlessQueue:check_recurring(now, count)
-- This is how many jobs we've moved so far
local moved = 0
-- These are the recurring jobs that need work
local r = self.recurring.peek(now, 0, count)
for index, jid in ipairs(r) do
  -- For each of the jids that need jobs scheduled, first
  -- get the last time each of them was run, and then increment
  -- it by its interval. While this time is less than now,
  -- we need to keep putting jobs on the queue
  local klass, data, priority, tags, retries, interval, backlog = unpack(
    redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority',
      'tags', 'retries', 'interval', 'backlog'))
  local _tags = cjson.decode(tags)
  local score = math.floor(tonumber(self.recurring.score(jid)))
  interval = tonumber(interval)

  -- If the backlog is set for this job, then see if it's been a long
  -- time since the last pop
  backlog = tonumber(backlog or 0)
  if backlog ~= 0 then
    -- Check how many jobs we could concievably generate
    local num = ((now - score) / interval)
    if num > backlog then
      -- Update the score
      score = score + (
        math.ceil(num - backlog) * interval
      )
    end
  end

  -- We're saving this value so that in the history, we can accurately
  -- reflect when the job would normally have been scheduled
  while (score <= now) and (moved < count) do
    local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
    moved = moved + 1

    local child_jid = jid .. '-' .. count

    -- Add this job to the list of jobs tagged with whatever tags were
    -- supplied
    for i, tag in ipairs(_tags) do
      redis.call('zadd', 'ql:t:' .. tag, now, child_jid)
      redis.call('zincrby', 'ql:tags', 1, tag)
    end

    -- First, let's save its data
    redis.call('hmset', QlessJob.ns .. child_jid,
      'jid'             , child_jid,
      'klass'           , klass,
      'data'            , data,
      'priority'        , priority,
      'tags'            , tags,
      'state'           , 'waiting',
      'worker'          , '',
      'expires'         , 0,
      'queue'           , self.name,
      'retries'         , retries,
      'remaining'       , retries,
      'time'            , string.format("%.20f", score),
      'spawned_from_jid', jid)
    Qless.job(child_jid):history(score, 'put', {q = self.name})

    -- Now, if a delay was provided, and if it's in the future,
    -- then we'll have to schedule it. Otherwise, we're just
    -- going to add it to the work queue.
    self.work.add(score, priority, child_jid)

    score = score + interval
    self.recurring.add(score, jid)
  end
end
end

-- Check for any jobs that have been scheduled, and shovel them onto
-- the work queue. Returns nothing, but afterwards, up to `count`
-- scheduled jobs will be moved into the work queue
function QlessQueue:check_scheduled(now, count)
-- zadd is a list of arguments that we'll be able to use to
-- insert into the work queue
local scheduled = self.scheduled.ready(now, 0, count)
for index, jid in ipairs(scheduled) do
  -- With these in hand, we'll have to go out and find the
  -- priorities of these jobs, and then we'll insert them
  -- into the work queue and then when that's complete, we'll
  -- remove them from the scheduled queue
  local priority = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'priority') or 0)
  self.work.add(now, priority, jid)
  self.scheduled.remove(jid)

  -- We should also update them to have the state 'waiting'
  -- instead of 'scheduled'
  redis.call('hset', QlessJob.ns .. jid, 'state', 'waiting')
end
end

-- Check for and invalidate any locks that have been lost. Returns the
-- list of jids that have been invalidated
function QlessQueue:invalidate_locks(now, count)
local jids = {}
-- Iterate through all the expired locks and add them to the list
-- of keys that we'll return
for index, jid in ipairs(self.locks.expired(now, 0, count)) do
  -- Remove this job from the jobs that the worker that was running it
  -- has
  local worker, failure = unpack(
    redis.call('hmget', QlessJob.ns .. jid, 'worker', 'failure'))
  redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)

  -- We'll provide a grace period after jobs time out for them to give
  -- some indication of the failure mode. After that time, however, we'll
  -- consider the worker dust in the wind
  local grace_period = tonumber(Qless.config.get('grace-period'))

  -- Whether or not we've already sent a coutesy message
  local courtesy_sent = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'grace') or 0)

  -- If the remaining value is an odd multiple of 0.5, then we'll assume
  -- that we're just sending the message. Otherwise, it's time to
  -- actually hand out the work to another worker
  local send_message = (courtesy_sent ~= 1)
  local invalidate   = not send_message

  -- If the grace period has been disabled, then we'll do both.
  if grace_period <= 0 then
    send_message = true
    invalidate   = true
  end

  if send_message then
    -- This is where we supply a courtesy message and give the worker
    -- time to provide a failure message
    if redis.call('zscore', 'ql:tracked', jid) ~= false then
      Qless.publish('stalled', jid)
    end
    Qless.job(jid):history(now, 'timed-out')
    redis.call('hset', QlessJob.ns .. jid, 'grace', 1)

    -- Send a message to let the worker know that its lost its lock on
    -- the job
    local encoded = cjson.encode({
      jid    = jid,
      event  = 'lock_lost',
      worker = worker
    })
    Qless.publish('w:' .. worker, encoded)
    Qless.publish('log', encoded)
    self.locks.add(now + grace_period, jid)

    -- If we got any expired locks, then we should increment the
    -- number of retries for this stage for this bin. The bin is
    -- midnight of the provided day
    local bin = now - (now % 86400)
    redis.call('hincrby',
      'ql:s:stats:' .. bin .. ':' .. self.name, 'retries', 1)
  end

  if invalidate then
    -- Unset the grace period attribute so that next time we'll send
    -- the grace period
    redis.call('hdel', QlessJob.ns .. jid, 'grace', 0)

    -- See how many remaining retries the job has
    local remaining = tonumber(redis.call(
      'hincrby', QlessJob.ns .. jid, 'remaining', -1))

    -- This is where we actually have to time out the work
    if remaining < 0 then
      -- Now remove the instance from the schedule, and work queues
      -- for the queue it's in
      self.work.remove(jid)
      self.locks.remove(jid)
      self.scheduled.remove(jid)

      local group = 'failed-retries-' .. Qless.job(jid):data()['queue']
      local job = Qless.job(jid)
      job:history(now, 'failed', {group = group})
      redis.call('hmset', QlessJob.ns .. jid, 'state', 'failed',
        'worker', '',
        'expires', '')
      -- If the failure has not already been set, then set it
      redis.call('hset', QlessJob.ns .. jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] =
          'Job exhausted retries in queue "' .. self.name .. '"',
        ['when']    = now,
        ['worker']  = unpack(job:data('worker'))
      }))

      -- Add this type of failure to the list of failures
      redis.call('sadd', 'ql:failures', group)
      -- And add this particular instance to the failed types
      redis.call('lpush', 'ql:f:' .. group, jid)

      if redis.call('zscore', 'ql:tracked', jid) ~= false then
        Qless.publish('failed', jid)
      end
      Qless.publish('log', cjson.encode({
        jid     = jid,
        event   = 'failed',
        group   = group,
        worker  = worker,
        message =
          'Job exhausted retries in queue "' .. self.name .. '"'
      }))

      -- Increment the count of the failed jobs
      local bin = now - (now % 86400)
      redis.call('hincrby',
        'ql:s:stats:' .. bin .. ':' .. self.name, 'failures', 1)
      redis.call('hincrby',
        'ql:s:stats:' .. bin .. ':' .. self.name, 'failed'  , 1)
    else
      table.insert(jids, jid)
    end
  end
end

return jids
end

-- Forget the provided queues. As in, remove them from the list of known queues
function QlessQueue.deregister(...)
redis.call('zrem', Qless.ns .. 'queues', unpack(arg))
end

-- Return information about a particular queue, or all queues
--  [
--      {
--          'name': 'testing',
--          'stalled': 2,
--          'waiting': 5,
--          'running': 5,
--          'scheduled': 10,
--          'depends': 5,
--          'recurring': 0
--      }, {
--          ...
--      }
--  ]
function QlessQueue.counts(now, name)
if name then
  local queue = Qless.queue(name)
  local stalled = queue.locks.length(now)
  -- Check for any scheduled jobs that need to be moved
  queue:check_scheduled(now, queue.scheduled.length())
  return {
    name      = name,
    waiting   = queue.work.length(),
    stalled   = stalled,
    running   = queue.locks.length() - stalled,
    scheduled = queue.scheduled.length(),
    depends   = queue.depends.length(),
    recurring = queue.recurring.length(),
    paused    = queue:paused()
  }
else
  local queues = redis.call('zrange', 'ql:queues', 0, -1)
  local response = {}
  for index, qname in ipairs(queues) do
    table.insert(response, QlessQueue.counts(now, qname))
  end
  return response
end
end
-- Get all the attributes of this particular job
function QlessRecurringJob:data()
local job = redis.call(
  'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue',
  'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog')

if not job[1] then
  return nil
end

return {
  jid          = job[1],
  klass        = job[2],
  state        = job[3],
  queue        = job[4],
  priority     = tonumber(job[5]),
  interval     = tonumber(job[6]),
  retries      = tonumber(job[7]),
  count        = tonumber(job[8]),
  data         = job[9],
  tags         = cjson.decode(job[10]),
  backlog      = tonumber(job[11] or 0)
}
end

-- Update the recurring job data. Key can be:
--      - priority
--      - interval
--      - retries
--      - data
--      - klass
--      - queue
--      - backlog
function QlessRecurringJob:update(now, ...)
local options = {}
-- Make sure that the job exists
if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then
  for i = 1, #arg, 2 do
    local key = arg[i]
    local value = arg[i+1]
    assert(value, 'No value provided for ' .. tostring(key))
    if key == 'priority' or key == 'interval' or key == 'retries' then
      value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value))
      -- If the command is 'interval', then we need to update the
      -- time when it should next be scheduled
      if key == 'interval' then
        local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval'))
        Qless.queue(queue).recurring.update(
          value - tonumber(interval), self.jid)
      end
      redis.call('hset', 'ql:r:' .. self.jid, key, value)
    elseif key == 'data' then
      assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value))
      redis.call('hset', 'ql:r:' .. self.jid, 'data', value)
    elseif key == 'klass' then
      redis.call('hset', 'ql:r:' .. self.jid, 'klass', value)
    elseif key == 'queue' then
      local queue_obj = Qless.queue(
        redis.call('hget', 'ql:r:' .. self.jid, 'queue'))
      local score = queue_obj.recurring.score(self.jid)
      queue_obj.recurring.remove(self.jid)
      Qless.queue(value).recurring.add(score, self.jid)
      redis.call('hset', 'ql:r:' .. self.jid, 'queue', value)
      -- If we don't already know about the queue, learn about it
      if redis.call('zscore', 'ql:queues', value) == false then
        redis.call('zadd', 'ql:queues', now, value)
      end
    elseif key == 'backlog' then
      value = assert(tonumber(value),
        'Recur(): Arg "backlog" not a number: ' .. tostring(value))
      redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value)
    else
      error('Recur(): Unrecognized option "' .. key .. '"')
    end
  end
  return true
else
  error('Recur(): No recurring job ' .. self.jid)
end
end

-- Tags this recurring job with the provided tags
function QlessRecurringJob:tag(...)
local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags')
-- If the job has been canceled / deleted, then return false
if tags then
  -- Decode the json blob, convert to dictionary
  tags = cjson.decode(tags)
  local _tags = {}
  for i,v in ipairs(tags) do _tags[v] = true end

  -- Otherwise, add the job to the sorted set with that tags
  for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end

  tags = cjson.encode(tags)
  redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags)
  return tags
else
  error('Tag(): Job ' .. self.jid .. ' does not exist')
end
end

-- Removes a tag from the recurring job
function QlessRecurringJob:untag(...)
-- Get the existing tags
local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags')
-- If the job has been canceled / deleted, then return false
if tags then
  -- Decode the json blob, convert to dictionary
  tags = cjson.decode(tags)
  local _tags    = {}
  -- Make a hash
  for i,v in ipairs(tags) do _tags[v] = true end
  -- Delete these from the hash
  for i = 1,#arg do _tags[arg[i]] = nil end
  -- Back into a list
  local results = {}
  for i, tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end
  -- json encode them, set, and return
  tags = cjson.encode(results)
  redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags)
  return tags
else
  error('Untag(): Job ' .. self.jid .. ' does not exist')
end
end

-- Stop further occurrences of this job
function QlessRecurringJob:unrecur()
-- First, find out what queue it was attached to
local queue = redis.call('hget', 'ql:r:' .. self.jid, 'queue')
if queue then
  -- Now, delete it from the queue it was attached to, and delete the
  -- thing itself
  Qless.queue(queue).recurring.remove(self.jid)
  redis.call('del', 'ql:r:' .. self.jid)
  return true
else
  return true
end
end
-- Deregisters these workers from the list of known workers
function QlessWorker.deregister(...)
redis.call('zrem', 'ql:workers', unpack(arg))
end

-- Provide data about all the workers, or if a specific worker is provided,
-- then which jobs that worker is responsible for. If no worker is provided,
-- expect a response of the form:
--
--  [
--      # This is sorted by the recency of activity from that worker
--      {
--          'name'   : 'hostname1-pid1',
--          'jobs'   : 20,
--          'stalled': 0
--      }, {
--          ...
--      }
--  ]
--
-- If a worker id is provided, then expect a response of the form:
--
--  {
--      'jobs': [
--          jid1,
--          jid2,
--          ...
--      ], 'stalled': [
--          jid1,
--          ...
--      ]
--  }
--
function QlessWorker.counts(now, worker)
-- Clean up all the workers' job lists if they're too old. This is
-- determined by the `max-worker-age` configuration, defaulting to the
-- last day. Seems like a 'reasonable' default
local interval = tonumber(Qless.config.get('max-worker-age', 86400))

local workers  = redis.call('zrangebyscore', 'ql:workers', 0, now - interval)
for index, worker in ipairs(workers) do
  redis.call('del', 'ql:w:' .. worker .. ':jobs')
end

-- And now remove them from the list of known workers
redis.call('zremrangebyscore', 'ql:workers', 0, now - interval)

if worker then
  return {
    jobs    = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now + 8640000, now),
    stalled = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now, 0)
  }
else
  local response = {}
  local workers = redis.call('zrevrange', 'ql:workers', 0, -1)
  for index, worker in ipairs(workers) do
    table.insert(response, {
      name    = worker,
      jobs    = redis.call('zcount', 'ql:w:' .. worker .. ':jobs', now, now + 8640000),
      stalled = redis.call('zcount', 'ql:w:' .. worker .. ':jobs', 0, now)
    })
  end
  return response
end
end

LUA_SOURCE
QLESS_SOURCE =
<<-LUA_SOURCE.strip.freeze
  local Qless = {
ns = 'ql:'
}

local QlessQueue = {
ns = Qless.ns .. 'q:'
}
QlessQueue.__index = QlessQueue

local QlessWorker = {
ns = Qless.ns .. 'w:'
}
QlessWorker.__index = QlessWorker

local QlessJob = {
ns = Qless.ns .. 'j:'
}
QlessJob.__index = QlessJob

local QlessRecurringJob = {}
QlessRecurringJob.__index = QlessRecurringJob

Qless.config = {}

local function tbl_extend(self, other)
for i, v in ipairs(other) do
  table.insert(self, v)
end
end

function Qless.publish(channel, message)
redis.call('publish', Qless.ns .. channel, message)
end

function Qless.job(jid)
assert(jid, 'Job(): no jid provided')
local job = {}
setmetatable(job, QlessJob)
job.jid = jid
return job
end

function Qless.recurring(jid)
assert(jid, 'Recurring(): no jid provided')
local job = {}
setmetatable(job, QlessRecurringJob)
job.jid = jid
return job
end

function Qless.failed(group, start, limit)
start = assert(tonumber(start or 0),
  'Failed(): Arg "start" is not a number: ' .. (start or 'nil'))
limit = assert(tonumber(limit or 25),
  'Failed(): Arg "limit" is not a number: ' .. (limit or 'nil'))

if group then
  return {
    total = redis.call('llen', 'ql:f:' .. group),
    jobs  = redis.call('lrange', 'ql:f:' .. group, start, start + limit - 1)
  }
else
  local response = {}
  local groups = redis.call('smembers', 'ql:failures')
  for index, group in ipairs(groups) do
    response[group] = redis.call('llen', 'ql:f:' .. group)
  end
  return response
end
end

function Qless.jobs(now, state, ...)
assert(state, 'Jobs(): Arg "state" missing')
if state == 'complete' then
  local offset = assert(tonumber(arg[1] or 0),
    'Jobs(): Arg "offset" not a number: ' .. tostring(arg[1]))
  local count  = assert(tonumber(arg[2] or 25),
    'Jobs(): Arg "count" not a number: ' .. tostring(arg[2]))
  return redis.call('zrevrange', 'ql:completed', offset,
    offset + count - 1)
else
  local name  = assert(arg[1], 'Jobs(): Arg "queue" missing')
  local offset = assert(tonumber(arg[2] or 0),
    'Jobs(): Arg "offset" not a number: ' .. tostring(arg[2]))
  local count  = assert(tonumber(arg[3] or 25),
    'Jobs(): Arg "count" not a number: ' .. tostring(arg[3]))

  local queue = Qless.queue(name)
  if state == 'running' then
    return queue.locks.peek(now, offset, count)
  elseif state == 'stalled' then
    return queue.locks.expired(now, offset, count)
  elseif state == 'scheduled' then
    queue:check_scheduled(now, queue.scheduled.length())
    return queue.scheduled.peek(now, offset, count)
  elseif state == 'depends' then
    return queue.depends.peek(now, offset, count)
  elseif state == 'recurring' then
    return queue.recurring.peek('+inf', offset, count)
  else
    error('Jobs(): Unknown type "' .. state .. '"')
  end
end
end

function Qless.track(now, command, jid)
if command ~= nil then
  assert(jid, 'Track(): Arg "jid" missing')
  assert(Qless.job(jid):exists(), 'Track(): Job does not exist')
  if string.lower(command) == 'track' then
    Qless.publish('track', jid)
    return redis.call('zadd', 'ql:tracked', now, jid)
  elseif string.lower(command) == 'untrack' then
    Qless.publish('untrack', jid)
    return redis.call('zrem', 'ql:tracked', jid)
  else
    error('Track(): Unknown action "' .. command .. '"')
  end
else
  local response = {
    jobs = {},
    expired = {}
  }
  local jids = redis.call('zrange', 'ql:tracked', 0, -1)
  for index, jid in ipairs(jids) do
    local data = Qless.job(jid):data()
    if data then
      table.insert(response.jobs, data)
    else
      table.insert(response.expired, jid)
    end
  end
  return response
end
end

function Qless.tag(now, command, ...)
assert(command,
  'Tag(): Arg "command" must be "add", "remove", "get" or "top"')

if command == 'add' then
  local jid  = assert(arg[1], 'Tag(): Arg "jid" missing')
  local tags = redis.call('hget', QlessJob.ns .. jid, 'tags')
  if tags then
    tags = cjson.decode(tags)
    local _tags = {}
    for i,v in ipairs(tags) do _tags[v] = true end

    for i=2,#arg do
      local tag = arg[i]
      if _tags[tag] == nil or _tags[tag] == false then
        _tags[tag] = true
        table.insert(tags, tag)
      end
      redis.call('zadd', 'ql:t:' .. tag, now, jid)
      redis.call('zincrby', 'ql:tags', 1, tag)
    end

    redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags))
    return tags
  else
    error('Tag(): Job ' .. jid .. ' does not exist')
  end
elseif command == 'remove' then
  local jid  = assert(arg[1], 'Tag(): Arg "jid" missing')
  local tags = redis.call('hget', QlessJob.ns .. jid, 'tags')
  if tags then
    tags = cjson.decode(tags)
    local _tags = {}
    for i,v in ipairs(tags) do _tags[v] = true end

    for i=2,#arg do
      local tag = arg[i]
      _tags[tag] = nil
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end

    local results = {}
    for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end

    redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results))
    return results
  else
    error('Tag(): Job ' .. jid .. ' does not exist')
  end
elseif command == 'get' then
  local tag    = assert(arg[1], 'Tag(): Arg "tag" missing')
  local offset = assert(tonumber(arg[2] or 0),
    'Tag(): Arg "offset" not a number: ' .. tostring(arg[2]))
  local count  = assert(tonumber(arg[3] or 25),
    'Tag(): Arg "count" not a number: ' .. tostring(arg[3]))
  return {
    total = redis.call('zcard', 'ql:t:' .. tag),
    jobs  = redis.call('zrange', 'ql:t:' .. tag, offset, offset + count - 1)
  }
elseif command == 'top' then
  local offset = assert(tonumber(arg[1] or 0) , 'Tag(): Arg "offset" not a number: ' .. tostring(arg[1]))
  local count  = assert(tonumber(arg[2] or 25), 'Tag(): Arg "count" not a number: ' .. tostring(arg[2]))
  return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, count)
else
  error('Tag(): First argument must be "add", "remove" or "get"')
end
end

function Qless.cancel(...)
local dependents = {}
for _, jid in ipairs(arg) do
  dependents[jid] = redis.call(
    'smembers', QlessJob.ns .. jid .. '-dependents') or {}
end

for i, jid in ipairs(arg) do
  for j, dep in ipairs(dependents[jid]) do
    if dependents[dep] == nil or dependents[dep] == false then
      error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep ..
         ' but is not mentioned to be canceled')
    end
  end
end

for _, jid in ipairs(arg) do
  local state, queue, failure, worker = unpack(redis.call(
    'hmget', QlessJob.ns .. jid, 'state', 'queue', 'failure', 'worker'))

  if state ~= 'complete' then
    local encoded = cjson.encode({
      jid    = jid,
      worker = worker,
      event  = 'canceled',
      queue  = queue
    })
    Qless.publish('log', encoded)

    if worker and (worker ~= '') then
      redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
      Qless.publish('w:' .. worker, encoded)
    end

    if queue then
      local queue = Qless.queue(queue)
      queue.work.remove(jid)
      queue.locks.remove(jid)
      queue.scheduled.remove(jid)
      queue.depends.remove(jid)
    end

    for i, j in ipairs(redis.call(
      'smembers', QlessJob.ns .. jid .. '-dependencies')) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', jid)
    end

    redis.call('del', QlessJob.ns .. jid .. '-dependencies')

    if state == 'failed' then
      failure = cjson.decode(failure)
      redis.call('lrem', 'ql:f:' .. failure.group, 0, jid)
      if redis.call('llen', 'ql:f:' .. failure.group) == 0 then
        redis.call('srem', 'ql:failures', failure.group)
      end
      local bin = failure.when - (failure.when % 86400)
      local failed = redis.call(
        'hget', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed')
      redis.call('hset',
        'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1)
    end

    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end

    if redis.call('zscore', 'ql:tracked', jid) ~= false then
      Qless.publish('canceled', jid)
    end

    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
end

return arg
end


Qless.config.defaults = {
['application']        = 'qless',
['heartbeat']          = 60,
['grace-period']       = 10,
['stats-history']      = 30,
['histogram-history']  = 7,
['jobs-history-count'] = 50000,
['jobs-history']       = 604800
}

Qless.config.get = function(key, default)
if key then
  return redis.call('hget', 'ql:config', key) or
    Qless.config.defaults[key] or default
else
  local reply = redis.call('hgetall', 'ql:config')
  for i = 1, #reply, 2 do
    Qless.config.defaults[reply[i]] = reply[i + 1]
  end
  return Qless.config.defaults
end
end

Qless.config.set = function(option, value)
assert(option, 'config.set(): Arg "option" missing')
assert(value , 'config.set(): Arg "value" missing')
Qless.publish('log', cjson.encode({
  event  = 'config_set',
  option = option,
  value  = value
}))

redis.call('hset', 'ql:config', option, value)
end

Qless.config.unset = function(option)
assert(option, 'config.unset(): Arg "option" missing')
Qless.publish('log', cjson.encode({
  event  = 'config_unset',
  option = option
}))

redis.call('hdel', 'ql:config', option)
end

function QlessJob:data(...)
local job = redis.call(
    'hmget', QlessJob.ns .. self.jid, 'jid', 'klass', 'state', 'queue',
    'worker', 'priority', 'expires', 'retries', 'remaining', 'data',
    'tags', 'failure', 'spawned_from_jid')

if not job[1] then
  return nil
end

local data = {
  jid              = job[1],
  klass            = job[2],
  state            = job[3],
  queue            = job[4],
  worker           = job[5] or '',
  tracked          = redis.call(
    'zscore', 'ql:tracked', self.jid) ~= false,
  priority         = tonumber(job[6]),
  expires          = tonumber(job[7]) or 0,
  retries          = tonumber(job[8]),
  remaining        = math.floor(tonumber(job[9])),
  data             = job[10],
  tags             = cjson.decode(job[11]),
  history          = self:history(),
  failure          = cjson.decode(job[12] or '{}'),
  spawned_from_jid = job[13],
  dependents       = redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependents'),
  dependencies     = redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependencies')
}

if #arg > 0 then
  local response = {}
  for index, key in ipairs(arg) do
    table.insert(response, data[key])
  end
  return response
else
  return data
end
end

function QlessJob:complete(now, worker, queue, raw_data, ...)
assert(worker, 'Complete(): Arg "worker" missing')
assert(queue , 'Complete(): Arg "queue" missing')
local data = assert(cjson.decode(raw_data),
  'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data))

local options = {}
for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end

local nextq   = options['next']
local delay   = assert(tonumber(options['delay'] or 0))
local depends = assert(cjson.decode(options['depends'] or '[]'),
  'Complete(): Arg "depends" not JSON: ' .. tostring(options['depends']))

if options['delay'] and nextq == nil then
  error('Complete(): "delay" cannot be used without a "next".')
end

if options['depends'] and nextq == nil then
  error('Complete(): "depends" cannot be used without a "next".')
end

local bin = now - (now % 86400)

local lastworker, state, priority, retries, current_queue = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state',
    'priority', 'retries', 'queue'))

if lastworker == false then
  error('Complete(): Job ' .. self.jid .. ' does not exist')
elseif (state ~= 'running') then
  error('Complete(): Job ' .. self.jid .. ' is not currently running: ' ..
    state)
elseif lastworker ~= worker then
  error('Complete(): Job ' .. self.jid ..
    ' has been handed out to another worker: ' .. tostring(lastworker))
elseif queue ~= current_queue then
  error('Complete(): Job ' .. self.jid .. ' running in another queue: ' ..
    tostring(current_queue))
end

self:history(now, 'done')

if raw_data then
  redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data)
end

local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
queue_obj.locks.remove(self.jid)
queue_obj.scheduled.remove(self.jid)

local time = tonumber(
  redis.call('hget', QlessJob.ns .. self.jid, 'time') or now)
local waiting = now - time
Qless.queue(queue):stat(now, 'run', waiting)
redis.call('hset', QlessJob.ns .. self.jid,
  'time', string.format("%.20f", now))

redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
  Qless.publish('completed', self.jid)
end

if nextq then
  queue_obj = Qless.queue(nextq)
  Qless.publish('log', cjson.encode({
    jid   = self.jid,
    event = 'advanced',
    queue = queue,
    to    = nextq
  }))

  self:history(now, 'put', {q = nextq})

  if redis.call('zscore', 'ql:queues', nextq) == false then
    redis.call('zadd', 'ql:queues', now, nextq)
  end

  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'waiting',
    'worker', '',
    'failure', '{}',
    'queue', nextq,
    'expires', 0,
    'remaining', tonumber(retries))

  if (delay > 0) and (#depends == 0) then
    queue_obj.scheduled.add(now + delay, self.jid)
    return 'scheduled'
  else
    local count = 0
    for i, j in ipairs(depends) do
      local state = redis.call('hget', QlessJob.ns .. j, 'state')
      if (state and state ~= 'complete') then
        count = count + 1
        redis.call(
          'sadd', QlessJob.ns .. j .. '-dependents',self.jid)
        redis.call(
          'sadd', QlessJob.ns .. self.jid .. '-dependencies', j)
      end
    end
    if count > 0 then
      queue_obj.depends.add(now, self.jid)
      redis.call('hset', QlessJob.ns .. self.jid, 'state', 'depends')
      if delay > 0 then
        queue_obj.depends.add(now, self.jid)
        redis.call('hset', QlessJob.ns .. self.jid, 'scheduled', now + delay)
      end
      return 'depends'
    else
      queue_obj.work.add(now, priority, self.jid)
      return 'waiting'
    end
  end
else
  Qless.publish('log', cjson.encode({
    jid   = self.jid,
    event = 'completed',
    queue = queue
  }))

  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'complete',
    'worker', '',
    'failure', '{}',
    'queue', '',
    'expires', 0,
    'remaining', tonumber(retries))

  local count = Qless.config.get('jobs-history-count')
  local time  = Qless.config.get('jobs-history')

  count = tonumber(count or 50000)
  time  = tonumber(time  or 7 * 24 * 60 * 60)

  redis.call('zadd', 'ql:completed', now, self.jid)

  local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time)
  for index, jid in ipairs(jids) do
    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end
    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
  redis.call('zremrangebyscore', 'ql:completed', 0, now - time)

  jids = redis.call('zrange', 'ql:completed', 0, (-1-count))
  for index, jid in ipairs(jids) do
    local tags = cjson.decode(
      redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
    for i, tag in ipairs(tags) do
      redis.call('zrem', 'ql:t:' .. tag, jid)
      redis.call('zincrby', 'ql:tags', -1, tag)
    end
    redis.call('del', QlessJob.ns .. jid)
    redis.call('del', QlessJob.ns .. jid .. '-history')
  end
  redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count))

  for i, j in ipairs(redis.call(
    'smembers', QlessJob.ns .. self.jid .. '-dependents')) do
    redis.call('srem', QlessJob.ns .. j .. '-dependencies', self.jid)
    if redis.call(
      'scard', QlessJob.ns .. j .. '-dependencies') == 0 then
      local q, p, scheduled = unpack(
        redis.call('hmget', QlessJob.ns .. j, 'queue', 'priority', 'scheduled'))
      if q then
        local queue = Qless.queue(q)
        queue.depends.remove(j)
        if scheduled then
          queue.scheduled.add(scheduled, j)
          redis.call('hset', QlessJob.ns .. j, 'state', 'scheduled')
          redis.call('hdel', QlessJob.ns .. j, 'scheduled')
        else
          queue.work.add(now, p, j)
          redis.call('hset', QlessJob.ns .. j, 'state', 'waiting')
        end
      end
    end
  end

  redis.call('del', QlessJob.ns .. self.jid .. '-dependents')

  return 'complete'
end
end

function QlessJob:fail(now, worker, group, message, data)
local worker  = assert(worker           , 'Fail(): Arg "worker" missing')
local group   = assert(group            , 'Fail(): Arg "group" missing')
local message = assert(message          , 'Fail(): Arg "message" missing')

local bin = now - (now % 86400)

if data then
  data = cjson.decode(data)
end

local queue, state, oldworker = unpack(redis.call(
  'hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))

if not state then
  error('Fail(): Job ' .. self.jid .. 'does not exist')
elseif state ~= 'running' then
  error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state)
elseif worker ~= oldworker then
  error('Fail(): Job ' .. self.jid .. ' running with another worker: ' ..
    oldworker)
end

Qless.publish('log', cjson.encode({
  jid     = self.jid,
  event   = 'failed',
  worker  = worker,
  group   = group,
  message = message
}))

if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
  Qless.publish('failed', self.jid)
end

redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

self:history(now, 'failed', {worker = worker, group = group})

redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , 1)

local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
queue_obj.locks.remove(self.jid)
queue_obj.scheduled.remove(self.jid)

if data then
  redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data))
end

redis.call('hmset', QlessJob.ns .. self.jid,
  'state', 'failed',
  'worker', '',
  'expires', '',
  'failure', cjson.encode({
    ['group']   = group,
    ['message'] = message,
    ['when']    = math.floor(now),
    ['worker']  = worker
  }))

redis.call('sadd', 'ql:failures', group)
redis.call('lpush', 'ql:f:' .. group, self.jid)


return self.jid
end

function QlessJob:retry(now, queue, worker, delay, group, message)
assert(queue , 'Retry(): Arg "queue" missing')
assert(worker, 'Retry(): Arg "worker" missing')
delay = assert(tonumber(delay or 0),
  'Retry(): Arg "delay" not a number: ' .. tostring(delay))

local oldqueue, state, retries, oldworker, priority, failure = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state',
    'retries', 'worker', 'priority', 'failure'))

if oldworker == false then
  error('Retry(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error('Retry(): Job ' .. self.jid .. ' is not currently running: ' ..
    state)
elseif oldworker ~= worker then
  error('Retry(): Job ' .. self.jid ..
    ' has been given to another worker: ' .. oldworker)
end

local remaining = tonumber(redis.call(
  'hincrby', QlessJob.ns .. self.jid, 'remaining', -1))
redis.call('hdel', QlessJob.ns .. self.jid, 'grace')

Qless.queue(oldqueue).locks.remove(self.jid)

redis.call('zrem', 'ql:w:' .. worker .. ':jobs', self.jid)

if remaining < 0 then
  local group = group or 'failed-retries-' .. queue
  self:history(now, 'failed', {['group'] = group})

  redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed',
    'worker', '',
    'expires', '')
  if group ~= nil and message ~= nil then
    redis.call('hset', QlessJob.ns .. self.jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] = message,
        ['when']    = math.floor(now),
        ['worker']  = worker
      })
    )
  else
    redis.call('hset', QlessJob.ns .. self.jid,
    'failure', cjson.encode({
      ['group']   = group,
      ['message'] =
        'Job exhausted retries in queue "' .. oldqueue .. '"',
      ['when']    = now,
      ['worker']  = unpack(self:data('worker'))
    }))
  end

  if redis.call('zscore', 'ql:tracked', self.jid) ~= false then
    Qless.publish('failed', self.jid)
  end

  redis.call('sadd', 'ql:failures', group)
  redis.call('lpush', 'ql:f:' .. group, self.jid)
  local bin = now - (now % 86400)
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , 1)
else
  local queue_obj = Qless.queue(queue)
  if delay > 0 then
    queue_obj.scheduled.add(now + delay, self.jid)
    redis.call('hset', QlessJob.ns .. self.jid, 'state', 'scheduled')
  else
    queue_obj.work.add(now, priority, self.jid)
    redis.call('hset', QlessJob.ns .. self.jid, 'state', 'waiting')
  end

  if group ~= nil and message ~= nil then
    redis.call('hset', QlessJob.ns .. self.jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] = message,
        ['when']    = math.floor(now),
        ['worker']  = worker
      })
    )
  end
end

return math.floor(remaining)
end

function QlessJob:depends(now, command, ...)
assert(command, 'Depends(): Arg "command" missing')
local state = redis.call('hget', QlessJob.ns .. self.jid, 'state')
if state ~= 'depends' then
  error('Depends(): Job ' .. self.jid ..
    ' not in the depends state: ' .. tostring(state))
end

if command == 'on' then
  for i, j in ipairs(arg) do
    local state = redis.call('hget', QlessJob.ns .. j, 'state')
    if (state and state ~= 'complete') then
      redis.call(
        'sadd', QlessJob.ns .. j .. '-dependents'  , self.jid)
      redis.call(
        'sadd', QlessJob.ns .. self.jid .. '-dependencies', j)
    end
  end
  return true
elseif command == 'off' then
  if arg[1] == 'all' then
    for i, j in ipairs(redis.call(
      'smembers', QlessJob.ns .. self.jid .. '-dependencies')) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', self.jid)
    end
    redis.call('del', QlessJob.ns .. self.jid .. '-dependencies')
    local q, p = unpack(redis.call(
      'hmget', QlessJob.ns .. self.jid, 'queue', 'priority'))
    if q then
      local queue_obj = Qless.queue(q)
      queue_obj.depends.remove(self.jid)
      queue_obj.work.add(now, p, self.jid)
      redis.call('hset', QlessJob.ns .. self.jid, 'state', 'waiting')
    end
  else
    for i, j in ipairs(arg) do
      redis.call('srem', QlessJob.ns .. j .. '-dependents', self.jid)
      redis.call(
        'srem', QlessJob.ns .. self.jid .. '-dependencies', j)
      if redis.call('scard',
        QlessJob.ns .. self.jid .. '-dependencies') == 0 then
        local q, p = unpack(redis.call(
          'hmget', QlessJob.ns .. self.jid, 'queue', 'priority'))
        if q then
          local queue_obj = Qless.queue(q)
          queue_obj.depends.remove(self.jid)
          queue_obj.work.add(now, p, self.jid)
          redis.call('hset',
            QlessJob.ns .. self.jid, 'state', 'waiting')
        end
      end
    end
  end
  return true
else
  error('Depends(): Argument "command" must be "on" or "off"')
end
end

function QlessJob:heartbeat(now, worker, data)
assert(worker, 'Heatbeat(): Arg "worker" missing')

local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue') or ''
local expires = now + tonumber(
  Qless.config.get(queue .. '-heartbeat') or
  Qless.config.get('heartbeat', 60))

if data then
  data = cjson.decode(data)
end

local job_worker, state = unpack(
  redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state'))
if job_worker == false then
  error('Heartbeat(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error(
    'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state)
elseif job_worker ~= worker or #job_worker == 0 then
  error(
    'Heartbeat(): Job ' .. self.jid ..
    ' given out to another worker: ' .. job_worker)
else
  if data then
    redis.call('hmset', QlessJob.ns .. self.jid, 'expires',
      expires, 'worker', worker, 'data', cjson.encode(data))
  else
    redis.call('hmset', QlessJob.ns .. self.jid,
      'expires', expires, 'worker', worker)
  end

  redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid)

  redis.call('zadd', 'ql:workers', now, worker)

  local queue = Qless.queue(
    redis.call('hget', QlessJob.ns .. self.jid, 'queue'))
  queue.locks.add(expires, self.jid)
  return expires
end
end

function QlessJob:priority(priority)
priority = assert(tonumber(priority),
  'Priority(): Arg "priority" missing or not a number: ' ..
  tostring(priority))

local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue')

if queue == nil or queue == false then
  error('Priority(): Job ' .. self.jid .. ' does not exist')
elseif queue == '' then
  redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority)
  return priority
else
  local queue_obj = Qless.queue(queue)
  if queue_obj.work.score(self.jid) then
    queue_obj.work.add(0, priority, self.jid)
  end
  redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority)
  return priority
end
end

function QlessJob:update(data)
local tmp = {}
for k, v in pairs(data) do
  table.insert(tmp, k)
  table.insert(tmp, v)
end
redis.call('hmset', QlessJob.ns .. self.jid, unpack(tmp))
end

function QlessJob:timeout(now)
local queue_name, state, worker = unpack(redis.call('hmget',
  QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))
if queue_name == nil or queue_name == false then
  error('Timeout(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
  error('Timeout(): Job ' .. self.jid .. ' not running')
else
  self:history(now, 'timed-out')
  local queue = Qless.queue(queue_name)
  queue.locks.remove(self.jid)
  queue.work.add(now, '+inf', self.jid)
  redis.call('hmset', QlessJob.ns .. self.jid,
    'state', 'stalled', 'expires', 0)
  local encoded = cjson.encode({
    jid    = self.jid,
    event  = 'lock_lost',
    worker = worker
  })
  Qless.publish('w:' .. worker, encoded)
  Qless.publish('log', encoded)
  return queue_name
end
end

function QlessJob:exists()
return redis.call('exists', QlessJob.ns .. self.jid) == 1
end

function QlessJob:history(now, what, item)
local history = redis.call('hget', QlessJob.ns .. self.jid, 'history')
if history then
  history = cjson.decode(history)
  for i, value in ipairs(history) do
    redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
      cjson.encode({math.floor(value.put), 'put', {q = value.q}}))

    if value.popped then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode({math.floor(value.popped), 'popped',
          {worker = value.worker}}))
    end

    if value.failed then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode(
          {math.floor(value.failed), 'failed', nil}))
    end

    if value.done then
      redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
        cjson.encode(
          {math.floor(value.done), 'done', nil}))
    end
  end
  redis.call('hdel', QlessJob.ns .. self.jid, 'history')
end

if what == nil then
  local response = {}
  for i, value in ipairs(redis.call('lrange',
    QlessJob.ns .. self.jid .. '-history', 0, -1)) do
    value = cjson.decode(value)
    local dict = value[3] or {}
    dict['when'] = value[1]
    dict['what'] = value[2]
    table.insert(response, dict)
  end
  return response
else
  local count = tonumber(Qless.config.get('max-job-history', 100))
  if count > 0 then
    local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history')
    redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1)
    if obj ~= nil and obj ~= false then
      redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj)
    end
  end
  return redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
    cjson.encode({math.floor(now), what, item}))
end
end
function Qless.queue(name)
assert(name, 'Queue(): no queue name provided')
local queue = {}
setmetatable(queue, QlessQueue)
queue.name = name

queue.work = {
  peek = function(count)
    if count == 0 then
      return {}
    end
    local jids = {}
    for index, jid in ipairs(redis.call(
      'zrevrange', queue:prefix('work'), 0, count - 1)) do
      table.insert(jids, jid)
    end
    return jids
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('work'), unpack(arg))
    end
  end, add = function(now, priority, jid)
    if priority ~= '+inf' then
      priority = priority - (now / 10000000000)
    end
    return redis.call('zadd',
      queue:prefix('work'), priority, jid)
  end, score = function(jid)
    return redis.call('zscore', queue:prefix('work'), jid)
  end, length = function()
    return redis.call('zcard', queue:prefix('work'))
  end
}

queue.locks = {
  expired = function(now, offset, count)
    return redis.call('zrangebyscore',
      queue:prefix('locks'), '-inf', now, 'LIMIT', offset, count)
  end, peek = function(now, offset, count)
    return redis.call('zrangebyscore', queue:prefix('locks'),
      now, '+inf', 'LIMIT', offset, count)
  end, add = function(expires, jid)
    redis.call('zadd', queue:prefix('locks'), expires, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('locks'), unpack(arg))
    end
  end, running = function(now)
    return redis.call('zcount', queue:prefix('locks'), now, '+inf')
  end, length = function(now)
    if now then
      return redis.call('zcount', queue:prefix('locks'), 0, now)
    else
      return redis.call('zcard', queue:prefix('locks'))
    end
  end
}

queue.depends = {
  peek = function(now, offset, count)
    return redis.call('zrange',
      queue:prefix('depends'), offset, offset + count - 1)
  end, add = function(now, jid)
    redis.call('zadd', queue:prefix('depends'), now, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('depends'), unpack(arg))
    end
  end, length = function()
    return redis.call('zcard', queue:prefix('depends'))
  end
}

queue.scheduled = {
  peek = function(now, offset, count)
    return redis.call('zrange',
      queue:prefix('scheduled'), offset, offset + count - 1)
  end, ready = function(now, offset, count)
    return redis.call('zrangebyscore',
      queue:prefix('scheduled'), 0, now, 'LIMIT', offset, count)
  end, add = function(when, jid)
    redis.call('zadd', queue:prefix('scheduled'), when, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('scheduled'), unpack(arg))
    end
  end, length = function()
    return redis.call('zcard', queue:prefix('scheduled'))
  end
}

queue.recurring = {
  peek = function(now, offset, count)
    return redis.call('zrangebyscore', queue:prefix('recur'),
      0, now, 'LIMIT', offset, count)
  end, ready = function(now, offset, count)
  end, add = function(when, jid)
    redis.call('zadd', queue:prefix('recur'), when, jid)
  end, remove = function(...)
    if #arg > 0 then
      return redis.call('zrem', queue:prefix('recur'), unpack(arg))
    end
  end, update = function(increment, jid)
    redis.call('zincrby', queue:prefix('recur'), increment, jid)
  end, score = function(jid)
    return redis.call('zscore', queue:prefix('recur'), jid)
  end, length = function()
    return redis.call('zcard', queue:prefix('recur'))
  end
}
return queue
end

function QlessQueue:prefix(group)
if group then
  return QlessQueue.ns..self.name..'-'..group
else
  return QlessQueue.ns..self.name
end
end

function QlessQueue:stats(now, date)
date = assert(tonumber(date),
  'Stats(): Arg "date" missing or not a number: '.. (date or 'nil'))

local bin = date - (date % 86400)

local histokeys = {
  's0','s1','s2','s3','s4','s5','s6','s7','s8','s9','s10','s11','s12','s13','s14','s15','s16','s17','s18','s19','s20','s21','s22','s23','s24','s25','s26','s27','s28','s29','s30','s31','s32','s33','s34','s35','s36','s37','s38','s39','s40','s41','s42','s43','s44','s45','s46','s47','s48','s49','s50','s51','s52','s53','s54','s55','s56','s57','s58','s59',
  'm1','m2','m3','m4','m5','m6','m7','m8','m9','m10','m11','m12','m13','m14','m15','m16','m17','m18','m19','m20','m21','m22','m23','m24','m25','m26','m27','m28','m29','m30','m31','m32','m33','m34','m35','m36','m37','m38','m39','m40','m41','m42','m43','m44','m45','m46','m47','m48','m49','m50','m51','m52','m53','m54','m55','m56','m57','m58','m59',
  'h1','h2','h3','h4','h5','h6','h7','h8','h9','h10','h11','h12','h13','h14','h15','h16','h17','h18','h19','h20','h21','h22','h23',
  'd1','d2','d3','d4','d5','d6'
}

local mkstats = function(name, bin, queue)
  local results = {}

  local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue
  local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk'))

  count = tonumber(count) or 0
  mean  = tonumber(mean) or 0
  vk    = tonumber(vk)

  results.count     = count or 0
  results.mean      = mean  or 0
  results.histogram = {}

  if not count then
    results.std = 0
  else
    if count > 1 then
      results.std = math.sqrt(vk / (count - 1))
    else
      results.std = 0
    end
  end

  local histogram = redis.call('hmget', key, unpack(histokeys))
  for i=1,#histokeys do
    table.insert(results.histogram, tonumber(histogram[i]) or 0)
  end
  return results
end

local retries, failed, failures = unpack(redis.call('hmget', 'ql:s:stats:' .. bin .. ':' .. self.name, 'retries', 'failed', 'failures'))
return {
  retries  = tonumber(retries  or 0),
  failed   = tonumber(failed   or 0),
  failures = tonumber(failures or 0),
  wait     = mkstats('wait', bin, self.name),
  run      = mkstats('run' , bin, self.name)
}
end

function QlessQueue:peek(now, count)
count = assert(tonumber(count),
  'Peek(): Arg "count" missing or not a number: ' .. tostring(count))

local jids = self.locks.expired(now, 0, count)

self:check_recurring(now, count - #jids)

self:check_scheduled(now, count - #jids)

tbl_extend(jids, self.work.peek(count - #jids))

return jids
end

function QlessQueue:paused()
return redis.call('sismember', 'ql:paused_queues', self.name) == 1
end

function QlessQueue.pause(now, ...)
redis.call('sadd', 'ql:paused_queues', unpack(arg))
end

function QlessQueue.unpause(...)
redis.call('srem', 'ql:paused_queues', unpack(arg))
end

function QlessQueue:pop(now, worker, count)
assert(worker, 'Pop(): Arg "worker" missing')
count = assert(tonumber(count),
  'Pop(): Arg "count" missing or not a number: ' .. tostring(count))

local expires = now + tonumber(
  Qless.config.get(self.name .. '-heartbeat') or
  Qless.config.get('heartbeat', 60))

if self:paused() then
  return {}
end

redis.call('zadd', 'ql:workers', now, worker)

local max_concurrency = tonumber(
  Qless.config.get(self.name .. '-max-concurrency', 0))

if max_concurrency > 0 then
  local allowed = math.max(0, max_concurrency - self.locks.running(now))
  count = math.min(allowed, count)
  if count == 0 then
    return {}
  end
end

local jids = self:invalidate_locks(now, count)

self:check_recurring(now, count - #jids)

self:check_scheduled(now, count - #jids)

tbl_extend(jids, self.work.peek(count - #jids))

local state
for index, jid in ipairs(jids) do
  local job = Qless.job(jid)
  state = unpack(job:data('state'))
  job:history(now, 'popped', {worker = worker})

  local time = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'time') or now)
  local waiting = now - time
  self:stat(now, 'wait', waiting)
  redis.call('hset', QlessJob.ns .. jid,
    'time', string.format("%.20f", now))

  redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)

  job:update({
    worker  = worker,
    expires = expires,
    state   = 'running'
  })

  self.locks.add(expires, jid)

  local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false
  if tracked then
    Qless.publish('popped', jid)
  end
end

self.work.remove(unpack(jids))

return jids
end

function QlessQueue:stat(now, stat, val)
local bin = now - (now % 86400)
local key = 'ql:s:' .. stat .. ':' .. bin .. ':' .. self.name

local count, mean, vk = unpack(
  redis.call('hmget', key, 'total', 'mean', 'vk'))

count = count or 0
if count == 0 then
  mean  = val
  vk    = 0
  count = 1
else
  count = count + 1
  local oldmean = mean
  mean  = mean + (val - mean) / count
  vk    = vk + (val - mean) * (val - oldmean)
end

val = math.floor(val)
if val < 60 then -- seconds
  redis.call('hincrby', key, 's' .. val, 1)
elseif val < 3600 then -- minutes
  redis.call('hincrby', key, 'm' .. math.floor(val / 60), 1)
elseif val < 86400 then -- hours
  redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1)
else -- days
  redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1)
end
redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk)
end

function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
assert(jid  , 'Put(): Arg "jid" missing')
assert(klass, 'Put(): Arg "klass" missing')
local data = assert(cjson.decode(raw_data),
  'Put(): Arg "data" missing or not JSON: ' .. tostring(raw_data))
delay = assert(tonumber(delay),
  'Put(): Arg "delay" not a number: ' .. tostring(delay))

if #arg % 2 == 1 then
  error('Odd number of additional args: ' .. tostring(arg))
end
local options = {}
for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end

local job = Qless.job(jid)
local priority, tags, oldqueue, state, failure, retries, oldworker =
  unpack(redis.call('hmget', QlessJob.ns .. jid, 'priority', 'tags',
    'queue', 'state', 'failure', 'retries', 'worker'))

if tags then
  Qless.tag(now, 'remove', jid, unpack(cjson.decode(tags)))
end

retries  = assert(tonumber(options['retries']  or retries or 5) ,
  'Put(): Arg "retries" not a number: ' .. tostring(options['retries']))
tags     = assert(cjson.decode(options['tags'] or tags or '[]' ),
  'Put(): Arg "tags" not JSON'          .. tostring(options['tags']))
priority = assert(tonumber(options['priority'] or priority or 0),
  'Put(): Arg "priority" not a number'  .. tostring(options['priority']))
local depends = assert(cjson.decode(options['depends'] or '[]') ,
  'Put(): Arg "depends" not JSON: '     .. tostring(options['depends']))

if #depends > 0 then
  local new = {}
  for _, d in ipairs(depends) do new[d] = 1 end

  local original = redis.call(
    'smembers', QlessJob.ns .. jid .. '-dependencies')
  for _, dep in pairs(original) do
    if new[dep] == nil or new[dep] == false then
      redis.call('srem', QlessJob.ns .. dep .. '-dependents'  , jid)
      redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep)
    end
  end
end

Qless.publish('log', cjson.encode({
  jid   = jid,
  event = 'put',
  queue = self.name
}))

job:history(now, 'put', {q = self.name})

if oldqueue then
  local queue_obj = Qless.queue(oldqueue)
  queue_obj.work.remove(jid)
  queue_obj.locks.remove(jid)
  queue_obj.depends.remove(jid)
  queue_obj.scheduled.remove(jid)
end

if oldworker and oldworker ~= '' then
  redis.call('zrem', 'ql:w:' .. oldworker .. ':jobs', jid)
  if oldworker ~= worker then
    local encoded = cjson.encode({
      jid    = jid,
      event  = 'lock_lost',
      worker = oldworker
    })
    Qless.publish('w:' .. oldworker, encoded)
    Qless.publish('log', encoded)
  end
end

if state == 'complete' then
  redis.call('zrem', 'ql:completed', jid)
end

for i, tag in ipairs(tags) do
  redis.call('zadd', 'ql:t:' .. tag, now, jid)
  redis.call('zincrby', 'ql:tags', 1, tag)
end

if state == 'failed' then
  failure = cjson.decode(failure)
  redis.call('lrem', 'ql:f:' .. failure.group, 0, jid)
  if redis.call('llen', 'ql:f:' .. failure.group) == 0 then
    redis.call('srem', 'ql:failures', failure.group)
  end
  local bin = failure.when - (failure.when % 86400)
  redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed'  , -1)
end

redis.call('hmset', QlessJob.ns .. jid,
  'jid'      , jid,
  'klass'    , klass,
  'data'     , raw_data,
  'priority' , priority,
  'tags'     , cjson.encode(tags),
  'state'    , ((delay > 0) and 'scheduled') or 'waiting',
  'worker'   , '',
  'expires'  , 0,
  'queue'    , self.name,
  'retries'  , retries,
  'remaining', retries,
  'time'     , string.format("%.20f", now))

for i, j in ipairs(depends) do
  local state = redis.call('hget', QlessJob.ns .. j, 'state')
  if (state and state ~= 'complete') then
    redis.call('sadd', QlessJob.ns .. j .. '-dependents'  , jid)
    redis.call('sadd', QlessJob.ns .. jid .. '-dependencies', j)
  end
end

if delay > 0 then
  if redis.call('scard', QlessJob.ns .. jid .. '-dependencies') > 0 then
    self.depends.add(now, jid)
    redis.call('hmset', QlessJob.ns .. jid,
      'state', 'depends',
      'scheduled', now + delay)
  else
    self.scheduled.add(now + delay, jid)
  end
else
  if redis.call('scard', QlessJob.ns .. jid .. '-dependencies') > 0 then
    self.depends.add(now, jid)
    redis.call('hset', QlessJob.ns .. jid, 'state', 'depends')
  else
    self.work.add(now, priority, jid)
  end
end

if redis.call('zscore', 'ql:queues', self.name) == false then
  redis.call('zadd', 'ql:queues', now, self.name)
end

if redis.call('zscore', 'ql:tracked', jid) ~= false then
  Qless.publish('put', jid)
end

return jid
end

function QlessQueue:unfail(now, group, count)
assert(group, 'Unfail(): Arg "group" missing')
count = assert(tonumber(count or 25),
  'Unfail(): Arg "count" not a number: ' .. tostring(count))

local jids = redis.call('lrange', 'ql:f:' .. group, -count, -1)

local toinsert = {}
for index, jid in ipairs(jids) do
  local job = Qless.job(jid)
  local data = job:data()
  job:history(now, 'put', {q = self.name})
  redis.call('hmset', QlessJob.ns .. data.jid,
    'state'    , 'waiting',
    'worker'   , '',
    'expires'  , 0,
    'queue'    , self.name,
    'remaining', data.retries or 5)
  self.work.add(now, data.priority, data.jid)
end

redis.call('ltrim', 'ql:f:' .. group, 0, -count - 1)
if (redis.call('llen', 'ql:f:' .. group) == 0) then
  redis.call('srem', 'ql:failures', group)
end

return #jids
end

function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
assert(jid  , 'RecurringJob On(): Arg "jid" missing')
assert(klass, 'RecurringJob On(): Arg "klass" missing')
assert(spec , 'RecurringJob On(): Arg "spec" missing')
local data = assert(cjson.decode(raw_data),
  'RecurringJob On(): Arg "data" not JSON: ' .. tostring(raw_data))

if spec == 'interval' then
  local interval = assert(tonumber(arg[1]),
    'Recur(): Arg "interval" not a number: ' .. tostring(arg[1]))
  local offset   = assert(tonumber(arg[2]),
    'Recur(): Arg "offset" not a number: '   .. tostring(arg[2]))
  if interval <= 0 then
    error('Recur(): Arg "interval" must be greater than 0')
  end

  if #arg % 2 == 1 then
    error('Odd number of additional args: ' .. tostring(arg))
  end

  local options = {}
  for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end
  options.tags = assert(cjson.decode(options.tags or '{}'),
    'Recur(): Arg "tags" must be JSON string array: ' .. tostring(
      options.tags))
  options.priority = assert(tonumber(options.priority or 0),
    'Recur(): Arg "priority" not a number: ' .. tostring(
      options.priority))
  options.retries = assert(tonumber(options.retries  or 0),
    'Recur(): Arg "retries" not a number: ' .. tostring(
      options.retries))
  options.backlog = assert(tonumber(options.backlog  or 0),
    'Recur(): Arg "backlog" not a number: ' .. tostring(
      options.backlog))

  local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue'))
  count = count or 0

  if old_queue then
    Qless.queue(old_queue).recurring.remove(jid)
  end

  redis.call('hmset', 'ql:r:' .. jid,
    'jid'     , jid,
    'klass'   , klass,
    'data'    , raw_data,
    'priority', options.priority,
    'tags'    , cjson.encode(options.tags or {}),
    'state'   , 'recur',
    'queue'   , self.name,
    'type'    , 'interval',
    'count'   , count,
    'interval', interval,
    'retries' , options.retries,
    'backlog' , options.backlog)
  self.recurring.add(now + offset, jid)

  if redis.call('zscore', 'ql:queues', self.name) == false then
    redis.call('zadd', 'ql:queues', now, self.name)
  end

  return jid
else
  error('Recur(): schedule type "' .. tostring(spec) .. '" unknown')
end
end

function QlessQueue:length()
return  self.locks.length() + self.work.length() + self.scheduled.length()
end

function QlessQueue:check_recurring(now, count)
local moved = 0
local r = self.recurring.peek(now, 0, count)
for index, jid in ipairs(r) do
  local klass, data, priority, tags, retries, interval, backlog = unpack(
    redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority',
      'tags', 'retries', 'interval', 'backlog'))
  local _tags = cjson.decode(tags)
  local score = math.floor(tonumber(self.recurring.score(jid)))
  interval = tonumber(interval)

  backlog = tonumber(backlog or 0)
  if backlog ~= 0 then
    local num = ((now - score) / interval)
    if num > backlog then
      score = score + (
        math.ceil(num - backlog) * interval
      )
    end
  end

  while (score <= now) and (moved < count) do
    local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
    moved = moved + 1

    local child_jid = jid .. '-' .. count

    for i, tag in ipairs(_tags) do
      redis.call('zadd', 'ql:t:' .. tag, now, child_jid)
      redis.call('zincrby', 'ql:tags', 1, tag)
    end

    redis.call('hmset', QlessJob.ns .. child_jid,
      'jid'             , child_jid,
      'klass'           , klass,
      'data'            , data,
      'priority'        , priority,
      'tags'            , tags,
      'state'           , 'waiting',
      'worker'          , '',
      'expires'         , 0,
      'queue'           , self.name,
      'retries'         , retries,
      'remaining'       , retries,
      'time'            , string.format("%.20f", score),
      'spawned_from_jid', jid)
    Qless.job(child_jid):history(score, 'put', {q = self.name})

    self.work.add(score, priority, child_jid)

    score = score + interval
    self.recurring.add(score, jid)
  end
end
end

function QlessQueue:check_scheduled(now, count)
local scheduled = self.scheduled.ready(now, 0, count)
for index, jid in ipairs(scheduled) do
  local priority = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'priority') or 0)
  self.work.add(now, priority, jid)
  self.scheduled.remove(jid)

  redis.call('hset', QlessJob.ns .. jid, 'state', 'waiting')
end
end

function QlessQueue:invalidate_locks(now, count)
local jids = {}
for index, jid in ipairs(self.locks.expired(now, 0, count)) do
  local worker, failure = unpack(
    redis.call('hmget', QlessJob.ns .. jid, 'worker', 'failure'))
  redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)

  local grace_period = tonumber(Qless.config.get('grace-period'))

  local courtesy_sent = tonumber(
    redis.call('hget', QlessJob.ns .. jid, 'grace') or 0)

  local send_message = (courtesy_sent ~= 1)
  local invalidate   = not send_message

  if grace_period <= 0 then
    send_message = true
    invalidate   = true
  end

  if send_message then
    if redis.call('zscore', 'ql:tracked', jid) ~= false then
      Qless.publish('stalled', jid)
    end
    Qless.job(jid):history(now, 'timed-out')
    redis.call('hset', QlessJob.ns .. jid, 'grace', 1)

    local encoded = cjson.encode({
      jid    = jid,
      event  = 'lock_lost',
      worker = worker
    })
    Qless.publish('w:' .. worker, encoded)
    Qless.publish('log', encoded)
    self.locks.add(now + grace_period, jid)

    local bin = now - (now % 86400)
    redis.call('hincrby',
      'ql:s:stats:' .. bin .. ':' .. self.name, 'retries', 1)
  end

  if invalidate then
    redis.call('hdel', QlessJob.ns .. jid, 'grace', 0)

    local remaining = tonumber(redis.call(
      'hincrby', QlessJob.ns .. jid, 'remaining', -1))

    if remaining < 0 then
      self.work.remove(jid)
      self.locks.remove(jid)
      self.scheduled.remove(jid)

      local group = 'failed-retries-' .. Qless.job(jid):data()['queue']
      local job = Qless.job(jid)
      job:history(now, 'failed', {group = group})
      redis.call('hmset', QlessJob.ns .. jid, 'state', 'failed',
        'worker', '',
        'expires', '')
      redis.call('hset', QlessJob.ns .. jid,
      'failure', cjson.encode({
        ['group']   = group,
        ['message'] =
          'Job exhausted retries in queue "' .. self.name .. '"',
        ['when']    = now,
        ['worker']  = unpack(job:data('worker'))
      }))

      redis.call('sadd', 'ql:failures', group)
      redis.call('lpush', 'ql:f:' .. group, jid)

      if redis.call('zscore', 'ql:tracked', jid) ~= false then
        Qless.publish('failed', jid)
      end
      Qless.publish('log', cjson.encode({
        jid     = jid,
        event   = 'failed',
        group   = group,
        worker  = worker,
        message =
          'Job exhausted retries in queue "' .. self.name .. '"'
      }))

      local bin = now - (now % 86400)
      redis.call('hincrby',
        'ql:s:stats:' .. bin .. ':' .. self.name, 'failures', 1)
      redis.call('hincrby',
        'ql:s:stats:' .. bin .. ':' .. self.name, 'failed'  , 1)
    else
      table.insert(jids, jid)
    end
  end
end

return jids
end

function QlessQueue.deregister(...)
redis.call('zrem', Qless.ns .. 'queues', unpack(arg))
end

function QlessQueue.counts(now, name)
if name then
  local queue = Qless.queue(name)
  local stalled = queue.locks.length(now)
  queue:check_scheduled(now, queue.scheduled.length())
  return {
    name      = name,
    waiting   = queue.work.length(),
    stalled   = stalled,
    running   = queue.locks.length() - stalled,
    scheduled = queue.scheduled.length(),
    depends   = queue.depends.length(),
    recurring = queue.recurring.length(),
    paused    = queue:paused()
  }
else
  local queues = redis.call('zrange', 'ql:queues', 0, -1)
  local response = {}
  for index, qname in ipairs(queues) do
    table.insert(response, QlessQueue.counts(now, qname))
  end
  return response
end
end
function QlessRecurringJob:data()
local job = redis.call(
  'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue',
  'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog')

if not job[1] then
  return nil
end

return {
  jid          = job[1],
  klass        = job[2],
  state        = job[3],
  queue        = job[4],
  priority     = tonumber(job[5]),
  interval     = tonumber(job[6]),
  retries      = tonumber(job[7]),
  count        = tonumber(job[8]),
  data         = job[9],
  tags         = cjson.decode(job[10]),
  backlog      = tonumber(job[11] or 0)
}
end

function QlessRecurringJob:update(now, ...)
local options = {}
if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then
  for i = 1, #arg, 2 do
    local key = arg[i]
    local value = arg[i+1]
    assert(value, 'No value provided for ' .. tostring(key))
    if key == 'priority' or key == 'interval' or key == 'retries' then
      value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value))
      if key == 'interval' then
        local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval'))
        Qless.queue(queue).recurring.update(
          value - tonumber(interval), self.jid)
      end
      redis.call('hset', 'ql:r:' .. self.jid, key, value)
    elseif key == 'data' then
      assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value))
      redis.call('hset', 'ql:r:' .. self.jid, 'data', value)
    elseif key == 'klass' then
      redis.call('hset', 'ql:r:' .. self.jid, 'klass', value)
    elseif key == 'queue' then
      local queue_obj = Qless.queue(
        redis.call('hget', 'ql:r:' .. self.jid, 'queue'))
      local score = queue_obj.recurring.score(self.jid)
      queue_obj.recurring.remove(self.jid)
      Qless.queue(value).recurring.add(score, self.jid)
      redis.call('hset', 'ql:r:' .. self.jid, 'queue', value)
      if redis.call('zscore', 'ql:queues', value) == false then
        redis.call('zadd', 'ql:queues', now, value)
      end
    elseif key == 'backlog' then
      value = assert(tonumber(value),
        'Recur(): Arg "backlog" not a number: ' .. tostring(value))
      redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value)
    else
      error('Recur(): Unrecognized option "' .. key .. '"')
    end
  end
  return true
else
  error('Recur(): No recurring job ' .. self.jid)
end
end

function QlessRecurringJob:tag(...)
local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags')
if tags then
  tags = cjson.decode(tags)
  local _tags = {}
  for i,v in ipairs(tags) do _tags[v] = true end

  for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end

  tags = cjson.encode(tags)
  redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags)
  return tags
else
  error('Tag(): Job ' .. self.jid .. ' does not exist')
end
end

function QlessRecurringJob:untag(...)
local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags')
if tags then
  tags = cjson.decode(tags)
  local _tags    = {}
  for i,v in ipairs(tags) do _tags[v] = true end
  for i = 1,#arg do _tags[arg[i]] = nil end
  local results = {}
  for i, tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end
  tags = cjson.encode(results)
  redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags)
  return tags
else
  error('Untag(): Job ' .. self.jid .. ' does not exist')
end
end

function QlessRecurringJob:unrecur()
local queue = redis.call('hget', 'ql:r:' .. self.jid, 'queue')
if queue then
  Qless.queue(queue).recurring.remove(self.jid)
  redis.call('del', 'ql:r:' .. self.jid)
  return true
else
  return true
end
end
function QlessWorker.deregister(...)
redis.call('zrem', 'ql:workers', unpack(arg))
end

function QlessWorker.counts(now, worker)
local interval = tonumber(Qless.config.get('max-worker-age', 86400))

local workers  = redis.call('zrangebyscore', 'ql:workers', 0, now - interval)
for index, worker in ipairs(workers) do
  redis.call('del', 'ql:w:' .. worker .. ':jobs')
end

redis.call('zremrangebyscore', 'ql:workers', 0, now - interval)

if worker then
  return {
    jobs    = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now + 8640000, now),
    stalled = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now, 0)
  }
else
  local response = {}
  local workers = redis.call('zrevrange', 'ql:workers', 0, -1)
  for index, worker in ipairs(workers) do
    table.insert(response, {
      name    = worker,
      jobs    = redis.call('zcount', 'ql:w:' .. worker .. ':jobs', now, now + 8640000),
      stalled = redis.call('zcount', 'ql:w:' .. worker .. ':jobs', 0, now)
    })
  end
  return response
end
end
local QlessAPI = {}

function QlessAPI.get(now, jid)
local data = Qless.job(jid):data()
if not data then
  return nil
end
return cjson.encode(data)
end

function QlessAPI.multiget(now, ...)
local results = {}
for i, jid in ipairs(arg) do
  table.insert(results, Qless.job(jid):data())
end
return cjson.encode(results)
end

QlessAPI['config.get'] = function(now, key)
if not key then
  return cjson.encode(Qless.config.get(key))
else
  return Qless.config.get(key)
end
end

QlessAPI['config.set'] = function(now, key, value)
return Qless.config.set(key, value)
end

QlessAPI['config.unset'] = function(now, key)
return Qless.config.unset(key)
end

QlessAPI.queues = function(now, queue)
return cjson.encode(QlessQueue.counts(now, queue))
end

QlessAPI.complete = function(now, jid, worker, queue, data, ...)
return Qless.job(jid):complete(now, worker, queue, data, unpack(arg))
end

QlessAPI.failed = function(now, group, start, limit)
return cjson.encode(Qless.failed(group, start, limit))
end

QlessAPI.fail = function(now, jid, worker, group, message, data)
return Qless.job(jid):fail(now, worker, group, message, data)
end

QlessAPI.jobs = function(now, state, ...)
return Qless.jobs(now, state, unpack(arg))
end

QlessAPI.retry = function(now, jid, queue, worker, delay, group, message)
return Qless.job(jid):retry(now, queue, worker, delay, group, message)
end

QlessAPI.depends = function(now, jid, command, ...)
return Qless.job(jid):depends(now, command, unpack(arg))
end

QlessAPI.heartbeat = function(now, jid, worker, data)
return Qless.job(jid):heartbeat(now, worker, data)
end

QlessAPI.workers = function(now, worker)
return cjson.encode(QlessWorker.counts(now, worker))
end

QlessAPI.track = function(now, command, jid)
return cjson.encode(Qless.track(now, command, jid))
end

QlessAPI.tag = function(now, command, ...)
return cjson.encode(Qless.tag(now, command, unpack(arg)))
end

QlessAPI.stats = function(now, queue, date)
return cjson.encode(Qless.queue(queue):stats(now, date))
end

QlessAPI.priority = function(now, jid, priority)
return Qless.job(jid):priority(priority)
end

QlessAPI.log = function(now, jid, message, data)
assert(jid, "Log(): Argument 'jid' missing")
assert(message, "Log(): Argument 'message' missing")
if data then
  data = assert(cjson.decode(data),
    "Log(): Argument 'data' not cjson: " .. tostring(data))
end

local job = Qless.job(jid)
assert(job:exists(), 'Log(): Job ' .. jid .. ' does not exist')
job:history(now, message, data)
end

QlessAPI.peek = function(now, queue, count)
local jids = Qless.queue(queue):peek(now, count)
local response = {}
for i, jid in ipairs(jids) do
  table.insert(response, Qless.job(jid):data())
end
return cjson.encode(response)
end

QlessAPI.pop = function(now, queue, worker, count)
local jids = Qless.queue(queue):pop(now, worker, count)
local response = {}
for i, jid in ipairs(jids) do
  table.insert(response, Qless.job(jid):data())
end
return cjson.encode(response)
end

QlessAPI.pause = function(now, ...)
return QlessQueue.pause(now, unpack(arg))
end

QlessAPI.unpause = function(now, ...)
return QlessQueue.unpause(unpack(arg))
end

QlessAPI.cancel = function(now, ...)
return Qless.cancel(unpack(arg))
end

QlessAPI.timeout = function(now, ...)
for _, jid in ipairs(arg) do
  Qless.job(jid):timeout(now)
end
end

QlessAPI.put = function(now, me, queue, jid, klass, data, delay, ...)
return Qless.queue(queue):put(now, me, jid, klass, data, delay, unpack(arg))
end

QlessAPI.requeue = function(now, me, queue, jid, ...)
local job = Qless.job(jid)
assert(job:exists(), 'Requeue(): Job ' .. jid .. ' does not exist')
return QlessAPI.put(now, me, queue, jid, unpack(arg))
end

QlessAPI.unfail = function(now, queue, group, count)
return Qless.queue(queue):unfail(now, group, count)
end

QlessAPI.recur = function(now, queue, jid, klass, data, spec, ...)
return Qless.queue(queue):recur(now, jid, klass, data, spec, unpack(arg))
end

QlessAPI.unrecur = function(now, jid)
return Qless.recurring(jid):unrecur()
end

QlessAPI['recur.get'] = function(now, jid)
local data = Qless.recurring(jid):data()
if not data then
  return nil
end
return cjson.encode(data)
end

QlessAPI['recur.update'] = function(now, jid, ...)
return Qless.recurring(jid):update(now, unpack(arg))
end

QlessAPI['recur.tag'] = function(now, jid, ...)
return Qless.recurring(jid):tag(unpack(arg))
end

QlessAPI['recur.untag'] = function(now, jid, ...)
return Qless.recurring(jid):untag(unpack(arg))
end

QlessAPI.length = function(now, queue)
return Qless.queue(queue):length()
end

QlessAPI['worker.deregister'] = function(now, ...)
return QlessWorker.deregister(unpack(arg))
end

QlessAPI['queue.forget'] = function(now, ...)
QlessQueue.deregister(unpack(arg))
end


if #KEYS > 0 then error('No Keys should be provided') end

local command_name = assert(table.remove(ARGV, 1), 'Must provide a command')
local command      = assert(
QlessAPI[command_name], 'Unknown command ' .. command_name)

local now          = tonumber(table.remove(ARGV, 1))
local now          = assert(
now, 'Arg "now" missing or not a number: ' .. (now or 'nil'))

return command(now, unpack(ARGV))

LUA_SOURCE