Class: Balsamique

Inherits:
Object
  • Object
show all
Defined in:
lib/balsamique.rb,
lib/balsamique/reporter.rb

Defined Under Namespace

Classes: Reporter

Constant Summary collapse

REPORT_RETRY_DELAY =

seconds

60.0
RETRY_DELAY =

seconds

600.0
STATS_SLICE =

seconds

60
STATS_CHUNK =

slices (= 3600 seconds = 1 hr)

60
SET_ID_FLOOR =
"local floor = tonumber(ARGV[1])\nlocal id = tonumber(redis.call('hget', KEYS[1], ''))\nif (not id) or floor > id then\n  redis.call('hset', KEYS[1], '', floor)\nend\n"
SET_ID_FLOOR_SHA =
Digest::SHA1.hexdigest(SET_ID_FLOOR)
ENQUEUE_JOB =

Lua script ENQUEUE_JOB takes keys

tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h

and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.

"local id = redis.call('hincrby', KEYS[6], '', 1)\nif ARGV[4] then\n  local ukey = 'u:' .. ARGV[4]\n  local uniq = redis.call('hsetnx', KEYS[6], ukey, id)\n  if 0 == uniq then\n    return (- redis.call('hget', KEYS[6], ukey))\n  else\n    redis.call('hset', KEYS[6], id, ukey)\n  end\nend\nredis.call('hset', KEYS[1], id, ARGV[1])\nredis.call('hset', KEYS[2], id, ARGV[2])\nredis.call('hset', KEYS[3], id, KEYS[4] .. ',' .. ARGV[3])\nredis.call('zadd', KEYS[4], ARGV[3], id)\nredis.call('hset', KEYS[5], KEYS[4], id .. ',' .. ARGV[3])\nreturn id\n"
ENQUEUE_JOB_SHA =
Digest::SHA1.hexdigest(ENQUEUE_JOB)
DEQUEUE_TASK =

Lua script DEQUEUE_TASK takes keys

[args_h, tasks_h, questats_h, retries_h, task1_z, ...],

and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.

"local ts = tonumber(ARGV[1])\nlocal i = 5\nwhile KEYS[i] do\n  local elem = redis.call('zrange', KEYS[i], 0, 0, 'withscores')\n  if elem[2] and tonumber(elem[2]) < ts then\n    local retries = redis.call('hincrby', KEYS[4], elem[1] .. ',' .. KEYS[i], 1)\n    local t_retry = ts + ARGV[2] * 2 ^ retries\n    redis.call('zadd', KEYS[i], t_retry, elem[1])\n    redis.call('hset', KEYS[3], KEYS[i] .. ',len,' .. ARGV[3],\n      redis.call('zcard', KEYS[i]))\n    redis.call('hincrby', KEYS[3], KEYS[i] .. ',dq,' .. ARGV[3], 1)\n    redis.call('expire', KEYS[3], 90000)\n    return({ elem[1],\n      redis.call('hget', KEYS[1], elem[1]),\n      redis.call('hget', KEYS[2], elem[1]), retries })\n  end\n  i = i + 1\nend\n"
DEQUEUE_TASK_SHA =
Digest::SHA1.hexdigest(DEQUEUE_TASK)
SUCCEED_TASK =
"local id = ARGV[1]\nlocal ts = ARGV[2]\nlocal tasks = cjson.decode(redis.call('hget', KEYS[1], id))\nlocal cur_task = ''\nfor _, task in ipairs(tasks) do\n  if not task[2] then cur_task = task[1]; break end\nend\nif (not (string.sub(KEYS[7], - string.len(cur_task)) == cur_task)) then\n  return redis.error_reply(\n    string.format('task mis-match %s %s %s', id, cur_task, KEYS[7]))\nend\nif (redis.call('hdel', KEYS[3], id .. ',' .. KEYS[7]) > 0) then\n  redis.call('zrem', KEYS[7], id)\nelse\n  return redis.error_reply('missing retry count %s %s', id, KEYS[7])\nend\nlocal status = redis.call('hget', KEYS[2], id)\nlocal i = 0\nfor r in string.gmatch(status, \"[^,]+\") do\n  i = i + 1\n  if (i > 2 and i % 2 == 1) then\n    local rkey = id .. ',' .. KEYS[7] .. ',' .. r\n    redis.call('zrem', KEYS[5], rkey)\n    redis.call('hdel', KEYS[6], rkey)\n  end\nend\nredis.call('hset', KEYS[1], id, ARGV[3])\nredis.call('hdel', KEYS[3], id .. ',' .. KEYS[4])\nredis.call('zadd', KEYS[4], ts, id)\nif (KEYS[8]) then\n  redis.call('hset', KEYS[2], id, KEYS[8] .. ',' .. ts)\n  redis.call('hdel', KEYS[3], id .. ',' .. KEYS[8])\n  redis.call('zadd', KEYS[8], ts, id)\n  redis.call('hset', KEYS[9], KEYS[8], id .. ',' .. ts)\nelse\n  redis.call('hset', KEYS[2], id, '_' .. ',' .. ts)\nend\nreturn id\n"
SUCCEED_TASK_SHA =
Digest::SHA1.hexdigest(SUCCEED_TASK)
FAIL_TASK =
"local id = ARGV[1]\nlocal ts = ARGV[2]\nlocal tasks = cjson.decode(redis.call('hget', KEYS[1], id))\nlocal cur_task = ''\nfor _, task in ipairs(tasks) do\n  if not task[2] then cur_task = task[1]; break end\nend\nif (not (string.sub(ARGV[3], - string.len(cur_task)) == cur_task)) then\n  return redis.error_reply(\n    string.format('task mismatch %s %s %s', id, cur_task, ARGV[3]))\nend\nlocal rkey = id .. ',' .. ARGV[3]\nlocal retries = tonumber(redis.call('hget', KEYS[3], rkey))\nif (not retries) then\n  return redis.error_reply(\n    string.format('missing retry count %s %s', id, ARGV[3]))\nend\nrkey = rkey .. ',' .. retries\nredis.call('zadd', KEYS[4], ts, rkey)\nredis.call('hset', KEYS[5], rkey, ARGV[4])\nlocal status = redis.call('hget', KEYS[2], id)\nstatus = status .. ',' .. retries .. ',' .. ts\nredis.call('hset', KEYS[2], id, status)\nredis.call('hdel', KEYS[3], id .. ',' .. KEYS[6])\nredis.call('zadd', KEYS[6], ts, id)\nreturn id\n"
FAIL_TASK_SHA =
Digest::SHA1.hexdigest(FAIL_TASK)
ACCELERATE_RETRIES =
"local ts = tonumber(ARGV[2])\nlocal incr = tonumber(ARGV[1])\nlocal jobs = redis.call('zrangebyscore', KEYS[1], ts, '+inf')\nlocal count = 0\nfor _, job in ipairs(jobs) do\n  redis.call('zadd', KEYS[1], ts, job)\n  count = count + 1\n  ts = ts + incr\nend\nreturn count\n"
ACCELERATE_RETRIES_SHA =
Digest::SHA1.hexdigest(ACCELERATE_RETRIES)
REPORT_POP =
"local t_pop = tonumber(ARGV[1])\nlocal elem = redis.call('zrange', KEYS[1], 0, 0, 'withscores')\nlocal t_elem = tonumber(elem[2])\nif (t_elem and t_elem < t_pop) then\n  local retries = redis.call('hincrby', KEYS[2], elem[1] .. ',' .. KEYS[1], 1)\n  local t_retry = t_pop + tonumber(ARGV[2]) * 2 ^ retries\n  redis.call('zadd', KEYS[1], t_retry, elem[1])\n  elem[3] = retries\n  elem[2] = t_elem\n  return cjson.encode(elem)\nend\n"
REPORT_POP_SHA =
Digest::SHA1.hexdigest(REPORT_POP)
REPORT_COMPLETE =
"if (redis.call('hdel', KEYS[2], ARGV[1] .. ',' .. KEYS[1]) > 0) then\n  redis.call('zrem', KEYS[1], ARGV[1])\nend\n"
REPORT_COMPLETE_SHA =
Digest::SHA1.hexdigest(REPORT_COMPLETE)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, namespace = 'bQ') ⇒ Balsamique



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/balsamique.rb', line 6

def initialize(redis, namespace = 'bQ')
  @redis = redis

  @que_prefix = namespace + ':que:'
  @questats_prefix = namespace + ':questats:'
  @env_prefix = namespace + ':env:'

  @status = namespace + ':status'
  @queues = namespace + ':queues'
  @retries = namespace + ':retries'
  @failures = namespace + ':failures'
  @failz = namespace + ':failz'
  @unique = namespace + ':unique'
  @tasks = namespace + ':tasks'
  @args = namespace + ':args'
  @report_queue = @que_prefix + '_report'
end

Class Method Details

.assemble_timestamp(chunk, slice) ⇒ Object



74
75
76
# File 'lib/balsamique.rb', line 74

def self.assemble_timestamp(chunk, slice)
  (chunk * STATS_CHUNK + slice) * STATS_SLICE
end

.current_task(tasks) ⇒ Object



47
48
49
50
# File 'lib/balsamique.rb', line 47

def self.current_task(tasks)
  item = tasks.reverse.find { |t| t.size > 1 }
  item && item.first
end

.dec36(s) ⇒ Object



68
69
70
# File 'lib/balsamique.rb', line 68

def self.dec36(s)
  s.to_i(36)
end

.dec36_assemble_timestamp(echunk, eslice) ⇒ Object



77
78
79
# File 'lib/balsamique.rb', line 77

def self.dec36_assemble_timestamp(echunk, eslice)
  self.assemble_timestamp(self.dec36(echunk), self.dec36(eslice))
end

.enc36(i) ⇒ Object



65
66
67
# File 'lib/balsamique.rb', line 65

def self.enc36(i)
  i.to_s(36)
end

.enc36_slice_timestamp(ts) ⇒ Object



71
72
73
# File 'lib/balsamique.rb', line 71

def self.enc36_slice_timestamp(ts)
  self.slice_timestamp(ts).map { |i| self.enc36(i) }
end

.next_task(tasks) ⇒ Object



42
43
44
45
# File 'lib/balsamique.rb', line 42

def self.next_task(tasks)
  item = tasks.find { |t| t.size == 1 }
  item && item.first
end

.slice_timestamp(ts) ⇒ Object



61
62
63
64
# File 'lib/balsamique.rb', line 61

def self.slice_timestamp(ts)
  slice = ts.to_i / STATS_SLICE
  return slice / STATS_CHUNK, slice % STATS_CHUNK
end

.strip_prefix(str, prefix) ⇒ Object



52
53
54
55
56
57
# File 'lib/balsamique.rb', line 52

def self.strip_prefix(str, prefix)
  s = prefix.size
  if str[0,s] == prefix
    str[s, str.size - s]
  end
end

Instance Method Details

#accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object



289
290
291
292
# File 'lib/balsamique.rb', line 289

def accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i)
  redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES,
    [@que_prefix + queue], [incr, timestamp])
end

#complete_report(id) ⇒ Object



532
533
534
535
# File 'lib/balsamique.rb', line 532

def complete_report(id)
  redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE,
    [@report_queue, @retries], [id])
end

#decode_job_status(status) ⇒ Object



393
394
395
396
397
398
399
400
401
402
# File 'lib/balsamique.rb', line 393

def decode_job_status(status)
  queue, ts, *retries = status.split(',')
  ts = ts.to_f
  timestamps = [ts]
  while retries.size > 0
    i = retries.shift.to_i
    timestamps[i] = retries.shift.to_f
  end
  return queue, timestamps
end

#delete_queue(queue) ⇒ Object



329
330
331
332
333
334
335
# File 'lib/balsamique.rb', line 329

def delete_queue(queue)
  queue_key = @que_prefix + queue.to_s
  redis.multi do |r|
    r.del(queue_key)
    r.hdel(@queues, queue_key)
  end.last == 1
end

#dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/balsamique.rb', line 171

def dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f)
  stats_chunk, stats_slice = self.class.enc36_slice_timestamp(timestamp)
  questats_key = @questats_prefix + stats_chunk
  keys = [@args, @tasks, questats_key, @retries]
  tasks.each { |task| keys << @que_prefix + task.to_s }
  result = redis_eval(
    DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys,
    [timestamp, retry_delay, stats_slice])
  if result
    id, args, tasks, retries = result
    { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks),
      retries: retries }
  end
end

#enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object



128
129
130
131
132
133
134
135
136
137
# File 'lib/balsamique.rb', line 128

def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f)
  next_task = self.class.next_task(tasks)
  return false, nil unless next_task
  queue_key = @que_prefix + next_task.to_s
  keys = [@tasks, @args, @status, queue_key, @queues, @unique]
  argv = [tasks.to_json, args.to_json, run_at]
  argv << uniq_in_flight if uniq_in_flight
  result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv)
  return result_id > 0, result_id.abs.to_s
end

#fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object



269
270
271
272
273
# File 'lib/balsamique.rb', line 269

def fail(id, task, details, timestamp = Time.now.to_f)
  keys = [@tasks, @status, @retries, @failz, @failures, @report_queue]
  argv = [id, timestamp, @que_prefix + task, JSON.generate(details)]
  id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv)
end

#failures(*args) ⇒ Object



325
326
327
# File 'lib/balsamique.rb', line 325

def failures(*args)
  get_failures(get_failz(*args))
end

#fill_args_tasks(statuses) ⇒ Object



465
466
467
468
469
470
471
472
473
474
475
# File 'lib/balsamique.rb', line 465

def fill_args_tasks(statuses)
  ids = statuses.keys
  args, tasks = redis.multi do |r|
    r.hmget(@args, ids)
    r.hmget(@tasks, ids)
  end
  ids.zip(args, tasks).each do |id, a, t|
    statuses[id][:args] = a && JSON.parse(a)
    statuses[id][:tasks] = t && JSON.parse(t)
  end
end

#fill_job_failures(statuses) ⇒ Object



448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/balsamique.rb', line 448

def fill_job_failures(statuses)
  failz = {}
  statuses.each do |id, status|
    next unless (task = status[:task])
    timestamps = status[:timestamps]
    next unless timestamps.size > 1
    queue = @que_prefix + task
    timestamps.drop(1).each_with_index do |ts, i|
      failz["#{id},#{queue},#{i+1}"] = ts
    end
  end
  get_failures(failz).each do |id, failures|
    statuses[id][:failures] = failures
  end
  statuses
end

#get_env(topic, keys = nil) ⇒ Object



554
555
556
557
558
559
560
561
562
563
564
565
566
# File 'lib/balsamique.rb', line 554

def get_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.hgetall(hkey)
  elsif keys.empty?
    {}
  else
    result = {}
    values = redis.hmget(hkey, keys)
    keys.zip(values).each { |k, v| result[k] = v }
    result
  end
end

#get_failures(failz) ⇒ Object



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/balsamique.rb', line 294

def get_failures(failz)
  result = Hash.new { Array.new }
  fkeys = failz.keys
  if fkeys.size > 0
    failures = redis.hmget(@failures, fkeys)
    fkeys.zip(failures).each do |key, details|
      id, queue, r = key.split(',')
      r = r.to_i
      task = self.class.strip_prefix(queue, @que_prefix)
      result[id] <<= {
        task: task, retries: r, ts: failz[key],
        details: details && JSON.parse(details) }
    end
  end
  result
end

#get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/balsamique.rb', line 311

def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)
  values =
    if limit < 0
      redis.zrevrangebyscore(
      @failz, latest, earliest, limit: [0, -limit], with_scores: true)
    else
      redis.zrangebyscore(
      @failz, earliest, latest, limit: [0, limit], with_scores: true)
    end
  result = {}
  values.each { |v| result[v[0]] = v[1] }
  result
end

#job_status(*ids) ⇒ Object



435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/balsamique.rb', line 435

def job_status(*ids)
  statuses = redis.hmget(@status, *ids)
  result = {}
  ids.zip(statuses).each do |(id, status)|
    next unless status
    queue, timestamps = decode_job_status(status)
    result[id] = {
      task: self.class.strip_prefix(queue, @que_prefix),
      timestamps: timestamps }
  end
  result
end

#pop_report(timestamp = Time.now.to_f) ⇒ Object



519
520
521
522
523
524
# File 'lib/balsamique.rb', line 519

def pop_report(timestamp = Time.now.to_f)
  result = redis_eval(
    REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries],
    [timestamp, REPORT_RETRY_DELAY])
  result &&= JSON.parse(result)
end

#push_report(id, timestamp = Time.now.to_f) ⇒ Object



497
498
499
500
501
502
# File 'lib/balsamique.rb', line 497

def push_report(id, timestamp = Time.now.to_f)
  redis.multi do |r|
    r.hdel(@retries, "#{id},#{@report_queue}")
    r.zadd(@report_queue, timestamp, id)
  end
end

#put_env(topic, h) ⇒ Object



537
538
539
540
541
542
543
# File 'lib/balsamique.rb', line 537

def put_env(topic, h)
  return if h.empty?
  kvs = []
  h.each { |k, v| kvs << k << v }
  hkey = @env_prefix + topic.to_s
  'OK' == redis.hmset(hkey, *kvs)
end

#queue_length(queue) ⇒ Object



374
375
376
# File 'lib/balsamique.rb', line 374

def queue_length(queue)
  redis.zcard(@que_prefix + queue) || 0
end

#queue_peek(queue, n = 1000) ⇒ Object



378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/balsamique.rb', line 378

def queue_peek(queue, n = 1000)
  qkey = @que_prefix + queue
  result = {}
  redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item|
    result[item[0]] = { ts: item[1] }
  end
  return result if result.empty?
  retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" })
  result.keys.zip(retries).each do |item|
    id, r = item
    result[id][:retries] = r.to_i
  end
  result
end

#queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object



477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/balsamique.rb', line 477

def queue_stats(chunks = 3, latest = Time.now.to_f)
  last_chunk, last_slice = self.class.slice_timestamp(latest)
  stats = {}
  (0..(chunks - 1)).each do |chunk_i|
    chunk_ts = self.class.enc36(last_chunk - chunk_i)
    questats_key = @questats_prefix + chunk_ts
    stats_chunk = redis.hgetall(questats_key)
    next unless stats_chunk
    stats_chunk.each do |key, val|
      queue, stat, slice = key.split(',')
      queue = self.class.strip_prefix(queue, @que_prefix)
      timestamp = self.class.dec36_assemble_timestamp(chunk_ts, slice)
      stats[stat] = {} unless stats[stat]
      stats[stat][timestamp] = {} unless stats[stat][timestamp]
      stats[stat][timestamp][queue] = val.to_i
    end
  end
  stats
end

#queuesObject



337
338
339
340
# File 'lib/balsamique.rb', line 337

def queues
  result = redis.hgetall(@queues)
  result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) }
end

#queues_infoObject



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/balsamique.rb', line 342

def queues_info
  qs_info = redis.hgetall(@queues)
  return {} if qs_info.empty?
  now = Time.now.to_f
  details = redis.multi do |r|
    qs_info.keys.each do |key|
      r.zrange(key, 0, 0, withscores: true)
      r.zcount(key, 0, now)
      r.zcard(key)
    end
  end
  result = {}
  qs_info.keys.each_with_index do |key, i|
    i3 = 3 * i
    queue = self.class.strip_prefix(key, @que_prefix)
    last_id, last_ts = qs_info[key].split(',')
    last_ts = last_ts.to_f
    next_id = next_ts = nil
    if (next_info = details[i3].first)
      next_id = next_info.first
      next_ts = next_info.last
    end
    result[queue] = {
      current_ts: now,
      last_id: last_id, last_ts: last_ts,
      total: details[i3 + 2], ready: details[i3 + 1],
      next_id: next_id, next_ts: next_ts
    }
  end
  result
end

#rand_delay(delay = RETRY_DELAY) ⇒ Object



27
28
29
# File 'lib/balsamique.rb', line 27

def rand_delay(delay = RETRY_DELAY)
  delay - 0.5 * rand() * delay
end

#redisObject



31
32
33
# File 'lib/balsamique.rb', line 31

def redis
  @redis
end

#redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object



35
36
37
38
39
40
# File 'lib/balsamique.rb', line 35

def redis_eval(cmd_sha, cmd, keys, argv)
  redis.evalsha(cmd_sha, keys, argv)
rescue Redis::CommandError
  puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}"
  redis.eval(cmd, keys, argv)
end

#remove_job(id) ⇒ Object



404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/balsamique.rb', line 404

def remove_job(id)
  return unless (status = redis.hget(@status, id))
  queue, timestamps = decode_job_status(status)
  redis.multi do |r|
    if queue.start_with?(@que_prefix)
      r.zrem(queue, id)
      rkey = "#{id},#{queue}"
      r.hdel(@retries, rkey)
      rkeys = []
      timestamps.drop(1).each_with_index do |ts, i|
        rkeys << rkey + ",#{i + 1}"
      end
      if rkeys.size > 0
        r.hdel(@failures, rkeys)
        r.zrem(@failz, rkeys)
      end
    end
    r.hdel(@args, id)
    r.hdel(@tasks, id)
  end
  return unless (check_status = redis.hget(@status, id))
  if check_status == status
    redis.hdel(@status, id)
    if (uid = redis.hget(@unique, id))
      redis.hdel(@unique, [id, uid])
    end
  else
    remove_job(id)
  end
end

#rm_env(topic, keys = nil) ⇒ Object



545
546
547
548
549
550
551
552
# File 'lib/balsamique.rb', line 545

def rm_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.del(hkey)
  elsif !keys.empty?
    redis.hdel(hkey, keys)
  end
end

#set_id_floor(id_floor) ⇒ Object



89
90
91
# File 'lib/balsamique.rb', line 89

def set_id_floor(id_floor)
  redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i])
end

#succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object



228
229
230
231
232
233
234
235
236
237
# File 'lib/balsamique.rb', line 228

def succeed(id, tasks, timestamp = Time.now.to_f)
  current_task = self.class.current_task(tasks)
  next_task = self.class.next_task(tasks)
  keys = [
    @tasks, @status, @retries, @report_queue, @failz, @failures,
    @que_prefix + current_task]
  argv = [id, timestamp, tasks.to_json]
  keys << (@que_prefix + next_task) << @queues if next_task
  id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv)
end