Class: Balsamique

Inherits:
Object
  • Object
show all
Defined in:
lib/balsamique.rb,
lib/balsamique/reporter.rb

Defined Under Namespace

Classes: Reporter

Constant Summary collapse

REPORT_RETRY_DELAY =

seconds

60.0
RETRY_DELAY =

seconds

600.0
STATS_SLICE =

seconds

60
STATS_CHUNK =

slices (= 3600 seconds = 1 hr)

60
SET_ID_FLOOR =
<<EOF
local floor = tonumber(ARGV[1])
local id = tonumber(redis.call('hget', KEYS[1], ''))
if (not id) or floor > id then
  redis.call('hset', KEYS[1], '', floor)
end
EOF
SET_ID_FLOOR_SHA =
Digest::SHA1.hexdigest(SET_ID_FLOOR)
ENQUEUE_JOB =

Lua script ENQUEUE_JOB takes keys

tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h

and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.

<<EOF
local id = redis.call('hincrby', KEYS[6], '', 1)
if ARGV[4] then
  local ukey = 'u:' .. ARGV[4]
  local uniq = redis.call('hsetnx', KEYS[6], ukey, id)
  if 0 == uniq then
    return (- redis.call('hget', KEYS[6], ukey))
  else
    redis.call('hset', KEYS[6], id, ukey)
  end
end
redis.call('hset', KEYS[1], id, ARGV[1])
redis.call('hset', KEYS[2], id, ARGV[2])
redis.call('hset', KEYS[3], id, KEYS[4] .. ',' .. ARGV[3])
redis.call('zadd', KEYS[4], ARGV[3], id)
redis.call('hset', KEYS[5], KEYS[4], id .. ',' .. ARGV[3])
return id
EOF
ENQUEUE_JOB_SHA =
Digest::SHA1.hexdigest(ENQUEUE_JOB)
DEQUEUE_TASK =

Lua script DEQUEUE_TASK takes keys

[args_h, tasks_h, questats_h, retries_h, task1_z, ...],

and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.

<<EOF
local ts = tonumber(ARGV[1])
local i = 5
while KEYS[i] do
  local elem = redis.call('zrange', KEYS[i], 0, 0, 'withscores')
  if elem[2] and tonumber(elem[2]) < ts then
    local retries = redis.call('hincrby', KEYS[4], elem[1] .. ',' .. KEYS[i], 1)
    local t_retry = ts + ARGV[2] * 2 ^ retries
    redis.call('zadd', KEYS[i], t_retry, elem[1])
    redis.call('hset', KEYS[3], KEYS[i] .. ',len,' .. ARGV[3],
      redis.call('zcard', KEYS[i]))
    redis.call('hincrby', KEYS[3], KEYS[i] .. ',dq,' .. ARGV[3], 1)
    redis.call('expire', KEYS[3], 90000)
    return({ elem[1],
      redis.call('hget', KEYS[1], elem[1]),
      redis.call('hget', KEYS[2], elem[1]), retries })
  end
  i = i + 1
end
EOF
DEQUEUE_TASK_SHA =
Digest::SHA1.hexdigest(DEQUEUE_TASK)
SUCCEED_TASK =
<<EOF
local id = ARGV[1]
local ts = ARGV[2]
local tasks = cjson.decode(redis.call('hget', KEYS[1], id))
local cur_task = ''
for _, task in ipairs(tasks) do
  if not task[2] then cur_task = task[1]; break end
end
if (not (string.sub(KEYS[7], - string.len(cur_task)) == cur_task)) then
  return redis.error_reply(
    string.format('task mis-match %s %s %s', id, cur_task, KEYS[7]))
end
if (redis.call('hdel', KEYS[3], id .. ',' .. KEYS[7]) > 0) then
  redis.call('zrem', KEYS[7], id)
else
  return redis.error_reply('missing retry count %s %s', id, KEYS[7])
end
local status = redis.call('hget', KEYS[2], id)
local i = 0
for r in string.gmatch(status, "[^,]+") do
  i = i + 1
  if (i > 2 and i % 2 == 1) then
    local rkey = id .. ',' .. KEYS[7] .. ',' .. r
    redis.call('zrem', KEYS[5], rkey)
    redis.call('hdel', KEYS[6], rkey)
  end
end
redis.call('hset', KEYS[1], id, ARGV[3])
redis.call('hdel', KEYS[3], id .. ',' .. KEYS[4])
redis.call('zadd', KEYS[4], ts, id)
if (KEYS[8]) then
  redis.call('hset', KEYS[2], id, KEYS[8] .. ',' .. ts)
  redis.call('hdel', KEYS[3], id .. ',' .. KEYS[8])
  redis.call('zadd', KEYS[8], ts, id)
  redis.call('hset', KEYS[9], KEYS[8], id .. ',' .. ts)
else
  redis.call('hset', KEYS[2], id, '_' .. ',' .. ts)
end
return id
EOF
SUCCEED_TASK_SHA =
Digest::SHA1.hexdigest(SUCCEED_TASK)
FAIL_TASK =
<<EOF
local id = ARGV[1]
local ts = ARGV[2]
local tasks = cjson.decode(redis.call('hget', KEYS[1], id))
local cur_task = ''
for _, task in ipairs(tasks) do
  if not task[2] then cur_task = task[1]; break end
end
if (not (string.sub(ARGV[3], - string.len(cur_task)) == cur_task)) then
  return redis.error_reply(
    string.format('task mismatch %s %s %s', id, cur_task, ARGV[3]))
end
local rkey = id .. ',' .. ARGV[3]
local retries = tonumber(redis.call('hget', KEYS[3], rkey))
if (not retries) then
  return redis.error_reply(
    string.format('missing retry count %s %s', id, ARGV[3]))
end
rkey = rkey .. ',' .. retries
redis.call('zadd', KEYS[4], ts, rkey)
redis.call('hset', KEYS[5], rkey, ARGV[4])
local status = redis.call('hget', KEYS[2], id)
status = status .. ',' .. retries .. ',' .. ts
redis.call('hset', KEYS[2], id, status)
redis.call('hdel', KEYS[3], id .. ',' .. KEYS[6])
redis.call('zadd', KEYS[6], ts, id)
return id
EOF
FAIL_TASK_SHA =
Digest::SHA1.hexdigest(FAIL_TASK)
ACCELERATE_RETRIES =
<<EOF
local ts = tonumber(ARGV[2])
local incr = tonumber(ARGV[1])
local jobs = redis.call('zrangebyscore', KEYS[1], ts, '+inf')
local count = 0
for _, job in ipairs(jobs) do
  redis.call('zadd', KEYS[1], ts, job)
  count = count + 1
  ts = ts + incr
end
return count
EOF
ACCELERATE_RETRIES_SHA =
Digest::SHA1.hexdigest(ACCELERATE_RETRIES)
RESCHEDULE =
<<EOF
local j = 2
while ARGV[j] do
  if redis.call('zscore', KEYS[1], ARGV[j]) then
    redis.call('zadd', KEYS[1], ARGV[1], ARGV[j])
  end
  j = j + 1
end
EOF
RESCHEDULE_SHA =
Digest::SHA1.hexdigest(RESCHEDULE)
REPORT_POP =
<<EOF
local t_pop = tonumber(ARGV[1])
local elem = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
local t_elem = tonumber(elem[2])
if (t_elem and t_elem < t_pop) then
  local retries = redis.call('hincrby', KEYS[2], elem[1] .. ',' .. KEYS[1], 1)
  local t_retry = t_pop + tonumber(ARGV[2]) * 2 ^ retries
  redis.call('zadd', KEYS[1], t_retry, elem[1])
  elem[3] = retries
  elem[2] = t_elem
  return cjson.encode(elem)
end
EOF
REPORT_POP_SHA =
Digest::SHA1.hexdigest(REPORT_POP)
REPORT_COMPLETE =
<<EOF
if (redis.call('hdel', KEYS[2], ARGV[1] .. ',' .. KEYS[1]) > 0) then
  redis.call('zrem', KEYS[1], ARGV[1])
end
EOF
REPORT_COMPLETE_SHA =
Digest::SHA1.hexdigest(REPORT_COMPLETE)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, namespace = 'bQ') ⇒ Balsamique

Returns a new instance of Balsamique.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/balsamique.rb', line 6

def initialize(redis, namespace = 'bQ')
  @redis = redis

  @que_prefix = namespace + ':que:'
  @questats_prefix = namespace + ':questats:'
  @env_prefix = namespace + ':env:'

  @status = namespace + ':status'
  @queues = namespace + ':queues'
  @retries = namespace + ':retries'
  @failures = namespace + ':failures'
  @failz = namespace + ':failz'
  @unique = namespace + ':unique'
  @tasks = namespace + ':tasks'
  @args = namespace + ':args'
  @report_queue = @que_prefix + '_report'
end

Class Method Details

.assemble_timestamp(chunk, slice) ⇒ Object



74
75
76
# File 'lib/balsamique.rb', line 74

def self.assemble_timestamp(chunk, slice)
  (chunk * STATS_CHUNK + slice) * STATS_SLICE
end

.current_task(tasks) ⇒ Object



47
48
49
50
# File 'lib/balsamique.rb', line 47

def self.current_task(tasks)
  item = tasks.reverse.find { |t| t.size > 1 }
  item && item.first
end

.dec36(s) ⇒ Object



68
69
70
# File 'lib/balsamique.rb', line 68

def self.dec36(s)
  s.to_i(36)
end

.dec36_assemble_timestamp(echunk, eslice) ⇒ Object



77
78
79
# File 'lib/balsamique.rb', line 77

def self.dec36_assemble_timestamp(echunk, eslice)
  self.assemble_timestamp(self.dec36(echunk), self.dec36(eslice))
end

.enc36(i) ⇒ Object



65
66
67
# File 'lib/balsamique.rb', line 65

def self.enc36(i)
  i.to_s(36)
end

.enc36_slice_timestamp(ts) ⇒ Object



71
72
73
# File 'lib/balsamique.rb', line 71

def self.enc36_slice_timestamp(ts)
  self.slice_timestamp(ts).map { |i| self.enc36(i) }
end

.next_task(tasks) ⇒ Object



42
43
44
45
# File 'lib/balsamique.rb', line 42

def self.next_task(tasks)
  item = tasks.find { |t| t.size == 1 }
  item && item.first
end

.slice_timestamp(ts) ⇒ Object



61
62
63
64
# File 'lib/balsamique.rb', line 61

def self.slice_timestamp(ts)
  slice = ts.to_i / STATS_SLICE
  return slice / STATS_CHUNK, slice % STATS_CHUNK
end

.strip_prefix(str, prefix) ⇒ Object



52
53
54
55
56
57
# File 'lib/balsamique.rb', line 52

def self.strip_prefix(str, prefix)
  s = prefix.size
  if str[0,s] == prefix
    str[s, str.size - s]
  end
end

Instance Method Details

#accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object



289
290
291
292
# File 'lib/balsamique.rb', line 289

def accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i)
  redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES,
    [@que_prefix + queue], [incr, timestamp])
end

#complete_report(id) ⇒ Object



548
549
550
551
# File 'lib/balsamique.rb', line 548

def complete_report(id)
  redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE,
    [@report_queue, @retries], [id])
end

#decode_job_status(status) ⇒ Object



409
410
411
412
413
414
415
416
417
418
# File 'lib/balsamique.rb', line 409

def decode_job_status(status)
  queue, ts, *retries = status.split(',')
  ts = ts.to_f
  timestamps = [ts]
  while retries.size > 0
    i = retries.shift.to_i
    timestamps[i] = retries.shift.to_f
  end
  return queue, timestamps
end

#delete_queue(queue) ⇒ Object



345
346
347
348
349
350
351
# File 'lib/balsamique.rb', line 345

def delete_queue(queue)
  queue_key = @que_prefix + queue.to_s
  redis.multi do |r|
    r.del(queue_key)
    r.hdel(@queues, queue_key)
  end.last == 1
end

#dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/balsamique.rb', line 171

def dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f)
  stats_chunk, stats_slice = self.class.enc36_slice_timestamp(timestamp)
  questats_key = @questats_prefix + stats_chunk
  keys = [@args, @tasks, questats_key, @retries]
  tasks.each { |task| keys << @que_prefix + task.to_s }
  result = redis_eval(
    DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys,
    [timestamp, retry_delay, stats_slice])
  if result
    id, args, tasks, retries = result
    { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks),
      retries: retries }
  end
end

#enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object



128
129
130
131
132
133
134
135
136
137
# File 'lib/balsamique.rb', line 128

def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f)
  next_task = self.class.next_task(tasks)
  return false, nil unless next_task
  queue_key = @que_prefix + next_task.to_s
  keys = [@tasks, @args, @status, queue_key, @queues, @unique]
  argv = [tasks.to_json, args.to_json, run_at]
  argv << uniq_in_flight if uniq_in_flight
  result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv)
  return result_id > 0, result_id.abs.to_s
end

#fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object



269
270
271
272
273
# File 'lib/balsamique.rb', line 269

def fail(id, task, details, timestamp = Time.now.to_f)
  keys = [@tasks, @status, @retries, @failz, @failures, @report_queue]
  argv = [id, timestamp, @que_prefix + task, JSON.generate(details)]
  id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv)
end

#failures(*args) ⇒ Object



341
342
343
# File 'lib/balsamique.rb', line 341

def failures(*args)
  get_failures(get_failz(*args))
end

#fill_args_tasks(statuses) ⇒ Object



481
482
483
484
485
486
487
488
489
490
491
# File 'lib/balsamique.rb', line 481

def fill_args_tasks(statuses)
  ids = statuses.keys
  args, tasks = redis.multi do |r|
    r.hmget(@args, ids)
    r.hmget(@tasks, ids)
  end
  ids.zip(args, tasks).each do |id, a, t|
    statuses[id][:args] = a && JSON.parse(a)
    statuses[id][:tasks] = t && JSON.parse(t)
  end
end

#fill_job_failures(statuses) ⇒ Object



464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/balsamique.rb', line 464

def fill_job_failures(statuses)
  failz = {}
  statuses.each do |id, status|
    next unless (task = status[:task])
    timestamps = status[:timestamps]
    next unless timestamps.size > 1
    queue = @que_prefix + task
    timestamps.drop(1).each_with_index do |ts, i|
      failz["#{id},#{queue},#{i+1}"] = ts
    end
  end
  get_failures(failz).each do |id, failures|
    statuses[id][:failures] = failures
  end
  statuses
end

#get_env(topic, keys = nil) ⇒ Object



570
571
572
573
574
575
576
577
578
579
580
581
582
# File 'lib/balsamique.rb', line 570

def get_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.hgetall(hkey)
  elsif keys.empty?
    {}
  else
    result = {}
    values = redis.hmget(hkey, keys)
    keys.zip(values).each { |k, v| result[k] = v }
    result
  end
end

#get_failures(failz) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/balsamique.rb', line 310

def get_failures(failz)
  result = Hash.new { Array.new }
  fkeys = failz.keys
  if fkeys.size > 0
    failures = redis.hmget(@failures, fkeys)
    fkeys.zip(failures).each do |key, details|
      id, queue, r = key.split(',')
      r = r.to_i
      task = self.class.strip_prefix(queue, @que_prefix)
      result[id] <<= {
        task: task, retries: r, ts: failz[key],
        details: details && JSON.parse(details) }
    end
  end
  result
end

#get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/balsamique.rb', line 327

def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)
  values =
    if limit < 0
      redis.zrevrangebyscore(
      @failz, latest, earliest, limit: [0, -limit], with_scores: true)
    else
      redis.zrangebyscore(
      @failz, earliest, latest, limit: [0, limit], with_scores: true)
    end
  result = {}
  values.each { |v| result[v[0]] = v[1] }
  result
end

#job_status(*ids) ⇒ Object



451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/balsamique.rb', line 451

def job_status(*ids)
  statuses = redis.hmget(@status, *ids)
  result = {}
  ids.zip(statuses).each do |(id, status)|
    next unless status
    queue, timestamps = decode_job_status(status)
    result[id] = {
      task: self.class.strip_prefix(queue, @que_prefix),
      timestamps: timestamps }
  end
  result
end

#pop_report(timestamp = Time.now.to_f) ⇒ Object



535
536
537
538
539
540
# File 'lib/balsamique.rb', line 535

def pop_report(timestamp = Time.now.to_f)
  result = redis_eval(
    REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries],
    [timestamp, REPORT_RETRY_DELAY])
  result &&= JSON.parse(result)
end

#push_report(id, timestamp = Time.now.to_f) ⇒ Object



513
514
515
516
517
518
# File 'lib/balsamique.rb', line 513

def push_report(id, timestamp = Time.now.to_f)
  redis.multi do |r|
    r.hdel(@retries, "#{id},#{@report_queue}")
    r.zadd(@report_queue, timestamp, id)
  end
end

#put_env(topic, h) ⇒ Object



553
554
555
556
557
558
559
# File 'lib/balsamique.rb', line 553

def put_env(topic, h)
  return if h.empty?
  kvs = []
  h.each { |k, v| kvs << k << v }
  hkey = @env_prefix + topic.to_s
  'OK' == redis.hmset(hkey, *kvs)
end

#queue_length(queue) ⇒ Object



390
391
392
# File 'lib/balsamique.rb', line 390

def queue_length(queue)
  redis.zcard(@que_prefix + queue) || 0
end

#queue_peek(queue, n = 1000) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/balsamique.rb', line 394

def queue_peek(queue, n = 1000)
  qkey = @que_prefix + queue
  result = {}
  redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item|
    result[item[0]] = { ts: item[1] }
  end
  return result if result.empty?
  retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" })
  result.keys.zip(retries).each do |item|
    id, r = item
    result[id][:retries] = r.to_i
  end
  result
end

#queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object



493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# File 'lib/balsamique.rb', line 493

def queue_stats(chunks = 3, latest = Time.now.to_f)
  last_chunk, last_slice = self.class.slice_timestamp(latest)
  stats = {}
  (0..(chunks - 1)).each do |chunk_i|
    chunk_ts = self.class.enc36(last_chunk - chunk_i)
    questats_key = @questats_prefix + chunk_ts
    stats_chunk = redis.hgetall(questats_key)
    next unless stats_chunk
    stats_chunk.each do |key, val|
      queue, stat, slice = key.split(',')
      queue = self.class.strip_prefix(queue, @que_prefix)
      timestamp = self.class.dec36_assemble_timestamp(chunk_ts, slice)
      stats[stat] = {} unless stats[stat]
      stats[stat][timestamp] = {} unless stats[stat][timestamp]
      stats[stat][timestamp][queue] = val.to_i
    end
  end
  stats
end

#queuesObject



353
354
355
356
# File 'lib/balsamique.rb', line 353

def queues
  result = redis.hgetall(@queues)
  result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) }
end

#queues_infoObject



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/balsamique.rb', line 358

def queues_info
  qs_info = redis.hgetall(@queues)
  return {} if qs_info.empty?
  now = Time.now.to_f
  details = redis.multi do |r|
    qs_info.keys.each do |key|
      r.zrange(key, 0, 0, withscores: true)
      r.zcount(key, 0, now)
      r.zcard(key)
    end
  end
  result = {}
  qs_info.keys.each_with_index do |key, i|
    i3 = 3 * i
    queue = self.class.strip_prefix(key, @que_prefix)
    last_id, last_ts = qs_info[key].split(',')
    last_ts = last_ts.to_f
    next_id = next_ts = nil
    if (next_info = details[i3].first)
      next_id = next_info.first
      next_ts = next_info.last
    end
    result[queue] = {
      current_ts: now,
      last_id: last_id, last_ts: last_ts,
      total: details[i3 + 2], ready: details[i3 + 1],
      next_id: next_id, next_ts: next_ts
    }
  end
  result
end

#rand_delay(delay = RETRY_DELAY) ⇒ Object



27
28
29
# File 'lib/balsamique.rb', line 27

def rand_delay(delay = RETRY_DELAY)
  delay - 0.5 * rand() * delay
end

#redisObject



31
32
33
# File 'lib/balsamique.rb', line 31

def redis
  @redis
end

#redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object



35
36
37
38
39
40
# File 'lib/balsamique.rb', line 35

def redis_eval(cmd_sha, cmd, keys, argv)
  redis.evalsha(cmd_sha, keys, argv)
rescue Redis::CommandError
  puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}"
  redis.eval(cmd, keys, argv)
end

#remove_job(id) ⇒ Object



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/balsamique.rb', line 420

def remove_job(id)
  return unless (status = redis.hget(@status, id))
  queue, timestamps = decode_job_status(status)
  redis.multi do |r|
    if queue.start_with?(@que_prefix)
      r.zrem(queue, id)
      rkey = "#{id},#{queue}"
      r.hdel(@retries, rkey)
      rkeys = []
      timestamps.drop(1).each_with_index do |ts, i|
        rkeys << rkey + ",#{i + 1}"
      end
      if rkeys.size > 0
        r.hdel(@failures, rkeys)
        r.zrem(@failz, rkeys)
      end
    end
    r.hdel(@args, id)
    r.hdel(@tasks, id)
  end
  return unless (check_status = redis.hget(@status, id))
  if check_status == status
    redis.hdel(@status, id)
    if (uid = redis.hget(@unique, id))
      redis.hdel(@unique, [id, uid])
    end
  else
    remove_job(id)
  end
end

#reschedule(queue, ids, timestamp = Time.now.to_f) ⇒ Object



305
306
307
308
# File 'lib/balsamique.rb', line 305

def reschedule(queue, ids, timestamp = Time.now.to_f)
  redis_eval(RESCHEDULE_SHA, RESCHEDULE,
    [@que_prefix + queue], [timestamp] + ids)
end

#rm_env(topic, keys = nil) ⇒ Object



561
562
563
564
565
566
567
568
# File 'lib/balsamique.rb', line 561

def rm_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.del(hkey)
  elsif !keys.empty?
    redis.hdel(hkey, keys)
  end
end

#set_id_floor(id_floor) ⇒ Object



89
90
91
# File 'lib/balsamique.rb', line 89

def set_id_floor(id_floor)
  redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i])
end

#succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object



228
229
230
231
232
233
234
235
236
237
# File 'lib/balsamique.rb', line 228

def succeed(id, tasks, timestamp = Time.now.to_f)
  current_task = self.class.current_task(tasks)
  next_task = self.class.next_task(tasks)
  keys = [
    @tasks, @status, @retries, @report_queue, @failz, @failures,
    @que_prefix + current_task]
  argv = [id, timestamp, tasks.to_json]
  keys << (@que_prefix + next_task) << @queues if next_task
  id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv)
end