Class: Balsamique
- Inherits:
-
Object
- Object
- Balsamique
- Defined in:
- lib/balsamique.rb,
lib/balsamique/reporter.rb
Defined Under Namespace
Classes: Reporter
Constant Summary collapse
- REPORT_RETRY_DELAY =
seconds
60.0- RETRY_DELAY =
seconds
600.0- STATS_SLICE =
seconds
60- STATS_CHUNK =
slices (= 3600 seconds = 1 hr)
60- SET_ID_FLOOR =
"local floor = tonumber(ARGV[1])\nlocal id = tonumber(redis.call('hget', KEYS[1], ''))\nif (not id) or floor > id then\n redis.call('hset', KEYS[1], '', floor)\nend\n"- SET_ID_FLOOR_SHA =
Digest::SHA1.hexdigest(SET_ID_FLOOR)
- ENQUEUE_JOB =
Lua script ENQUEUE_JOB takes keys
- tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h
-
and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.
"local id = redis.call('hincrby', KEYS[6], '', 1)\nif ARGV[4] then\n local ukey = 'u:' .. ARGV[4]\n local uniq = redis.call('hsetnx', KEYS[6], ukey, id)\n if 0 == uniq then\n return (- redis.call('hget', KEYS[6], ukey))\n else\n redis.call('hset', KEYS[6], id, ukey)\n end\nend\nredis.call('hset', KEYS[1], id, ARGV[1])\nredis.call('hset', KEYS[2], id, ARGV[2])\nredis.call('hset', KEYS[3], id, KEYS[4] .. ',' .. ARGV[3])\nredis.call('zadd', KEYS[4], ARGV[3], id)\nredis.call('hset', KEYS[5], KEYS[4], id .. ',' .. ARGV[3])\nreturn id\n"- ENQUEUE_JOB_SHA =
Digest::SHA1.hexdigest(ENQUEUE_JOB)
- DEQUEUE_TASK =
Lua script DEQUEUE_TASK takes keys
[args_h, tasks_h, questats_h, retries_h, task1_z, ...],and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.
"local ts = tonumber(ARGV[1])\nlocal i = 5\nwhile KEYS[i] do\n local elem = redis.call('zrange', KEYS[i], 0, 0, 'withscores')\n if elem[2] and tonumber(elem[2]) < ts then\n local retries = redis.call('hincrby', KEYS[4], elem[1] .. ',' .. KEYS[i], 1)\n local t_retry = ts + ARGV[2] * 2 ^ retries\n redis.call('zadd', KEYS[i], t_retry, elem[1])\n redis.call('hset', KEYS[3], KEYS[i] .. ',len,' .. ARGV[3],\n redis.call('zcard', KEYS[i]))\n redis.call('hincrby', KEYS[3], KEYS[i] .. ',dq,' .. ARGV[3], 1)\n redis.call('expire', KEYS[3], 90000)\n return({ elem[1],\n redis.call('hget', KEYS[1], elem[1]),\n redis.call('hget', KEYS[2], elem[1]), retries })\n end\n i = i + 1\nend\n"- DEQUEUE_TASK_SHA =
Digest::SHA1.hexdigest(DEQUEUE_TASK)
- SUCCEED_TASK =
"local id = ARGV[1]\nlocal ts = ARGV[2]\nlocal tasks = cjson.decode(redis.call('hget', KEYS[1], id))\nlocal cur_task = ''\nfor _, task in ipairs(tasks) do\n if not task[2] then cur_task = task[1]; break end\nend\nif (not (string.sub(KEYS[7], - string.len(cur_task)) == cur_task)) then\n return redis.error_reply(\n string.format('task mis-match %s %s %s', id, cur_task, KEYS[7]))\nend\nif (redis.call('hdel', KEYS[3], id .. ',' .. KEYS[7]) > 0) then\n redis.call('zrem', KEYS[7], id)\nelse\n return redis.error_reply('missing retry count %s %s', id, KEYS[7])\nend\nlocal status = redis.call('hget', KEYS[2], id)\nlocal i = 0\nfor r in string.gmatch(status, \"[^,]+\") do\n i = i + 1\n if (i > 2 and i % 2 == 1) then\n local rkey = id .. ',' .. KEYS[7] .. ',' .. r\n redis.call('zrem', KEYS[5], rkey)\n redis.call('hdel', KEYS[6], rkey)\n end\nend\nredis.call('hset', KEYS[1], id, ARGV[3])\nredis.call('hdel', KEYS[3], id .. ',' .. KEYS[4])\nredis.call('zadd', KEYS[4], ts, id)\nif (KEYS[8]) then\n redis.call('hset', KEYS[2], id, KEYS[8] .. ',' .. ts)\n redis.call('hdel', KEYS[3], id .. ',' .. KEYS[8])\n redis.call('zadd', KEYS[8], ts, id)\n redis.call('hset', KEYS[9], KEYS[8], id .. ',' .. ts)\nelse\n redis.call('hset', KEYS[2], id, '_' .. ',' .. ts)\nend\nreturn id\n"- SUCCEED_TASK_SHA =
Digest::SHA1.hexdigest(SUCCEED_TASK)
- FAIL_TASK =
"local id = ARGV[1]\nlocal ts = ARGV[2]\nlocal tasks = cjson.decode(redis.call('hget', KEYS[1], id))\nlocal cur_task = ''\nfor _, task in ipairs(tasks) do\n if not task[2] then cur_task = task[1]; break end\nend\nif (not (string.sub(ARGV[3], - string.len(cur_task)) == cur_task)) then\n return redis.error_reply(\n string.format('task mismatch %s %s %s', id, cur_task, ARGV[3]))\nend\nlocal rkey = id .. ',' .. ARGV[3]\nlocal retries = tonumber(redis.call('hget', KEYS[3], rkey))\nif (not retries) then\n return redis.error_reply(\n string.format('missing retry count %s %s', id, ARGV[3]))\nend\nrkey = rkey .. ',' .. retries\nredis.call('zadd', KEYS[4], ts, rkey)\nredis.call('hset', KEYS[5], rkey, ARGV[4])\nlocal status = redis.call('hget', KEYS[2], id)\nstatus = status .. ',' .. retries .. ',' .. ts\nredis.call('hset', KEYS[2], id, status)\nredis.call('hdel', KEYS[3], id .. ',' .. KEYS[6])\nredis.call('zadd', KEYS[6], ts, id)\nreturn id\n"- FAIL_TASK_SHA =
Digest::SHA1.hexdigest(FAIL_TASK)
- ACCELERATE_RETRIES =
"local ts = tonumber(ARGV[2])\nlocal incr = tonumber(ARGV[1])\nlocal jobs = redis.call('zrangebyscore', KEYS[1], ts, '+inf')\nlocal count = 0\nfor _, job in ipairs(jobs) do\n redis.call('zadd', KEYS[1], ts, job)\n count = count + 1\n ts = ts + incr\nend\nreturn count\n"- ACCELERATE_RETRIES_SHA =
Digest::SHA1.hexdigest(ACCELERATE_RETRIES)
- REPORT_POP =
"local t_pop = tonumber(ARGV[1])\nlocal elem = redis.call('zrange', KEYS[1], 0, 0, 'withscores')\nlocal t_elem = tonumber(elem[2])\nif (t_elem and t_elem < t_pop) then\n local retries = redis.call('hincrby', KEYS[2], elem[1] .. ',' .. KEYS[1], 1)\n local t_retry = t_pop + tonumber(ARGV[2]) * 2 ^ retries\n redis.call('zadd', KEYS[1], t_retry, elem[1])\n elem[3] = retries\n elem[2] = t_elem\n return cjson.encode(elem)\nend\n"- REPORT_POP_SHA =
Digest::SHA1.hexdigest(REPORT_POP)
- REPORT_COMPLETE =
"if (redis.call('hdel', KEYS[2], ARGV[1] .. ',' .. KEYS[1]) > 0) then\n redis.call('zrem', KEYS[1], ARGV[1])\nend\n"- REPORT_COMPLETE_SHA =
Digest::SHA1.hexdigest(REPORT_COMPLETE)
Class Method Summary collapse
- .assemble_timestamp(chunk, slice) ⇒ Object
- .current_task(tasks) ⇒ Object
- .dec36(s) ⇒ Object
- .dec36_assemble_timestamp(echunk, eslice) ⇒ Object
- .enc36(i) ⇒ Object
- .enc36_slice_timestamp(ts) ⇒ Object
- .next_task(tasks) ⇒ Object
- .slice_timestamp(ts) ⇒ Object
- .strip_prefix(str, prefix) ⇒ Object
Instance Method Summary collapse
- #accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object
- #complete_report(id) ⇒ Object
- #decode_job_status(status) ⇒ Object
- #delete_queue(queue) ⇒ Object
- #dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object
- #enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object
- #fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object
- #failures(*args) ⇒ Object
- #fill_args_tasks(statuses) ⇒ Object
- #fill_job_failures(statuses) ⇒ Object
- #get_env(topic, keys = nil) ⇒ Object
- #get_failures(failz) ⇒ Object
- #get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object
-
#initialize(redis, namespace = 'bQ') ⇒ Balsamique
constructor
A new instance of Balsamique.
- #job_status(*ids) ⇒ Object
- #pop_report(timestamp = Time.now.to_f) ⇒ Object
- #push_report(id, timestamp = Time.now.to_f) ⇒ Object
- #put_env(topic, h) ⇒ Object
- #queue_length(queue) ⇒ Object
- #queue_peek(queue, n = 1000) ⇒ Object
- #queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object
- #queues ⇒ Object
- #queues_info ⇒ Object
- #rand_delay(delay = RETRY_DELAY) ⇒ Object
- #redis ⇒ Object
- #redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object
- #remove_job(id) ⇒ Object
- #rm_env(topic, keys = nil) ⇒ Object
- #set_id_floor(id_floor) ⇒ Object
- #succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object
Constructor Details
#initialize(redis, namespace = 'bQ') ⇒ Balsamique
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/balsamique.rb', line 6 def initialize(redis, namespace = 'bQ') @redis = redis @que_prefix = namespace + ':que:' @questats_prefix = namespace + ':questats:' @env_prefix = namespace + ':env:' @status = namespace + ':status' @queues = namespace + ':queues' @retries = namespace + ':retries' @failures = namespace + ':failures' @failz = namespace + ':failz' @unique = namespace + ':unique' @tasks = namespace + ':tasks' @args = namespace + ':args' @report_queue = @que_prefix + '_report' end |
Class Method Details
.assemble_timestamp(chunk, slice) ⇒ Object
74 75 76 |
# File 'lib/balsamique.rb', line 74 def self.(chunk, slice) (chunk * STATS_CHUNK + slice) * STATS_SLICE end |
.current_task(tasks) ⇒ Object
47 48 49 50 |
# File 'lib/balsamique.rb', line 47 def self.current_task(tasks) item = tasks.reverse.find { |t| t.size > 1 } item && item.first end |
.dec36(s) ⇒ Object
68 69 70 |
# File 'lib/balsamique.rb', line 68 def self.dec36(s) s.to_i(36) end |
.dec36_assemble_timestamp(echunk, eslice) ⇒ Object
77 78 79 |
# File 'lib/balsamique.rb', line 77 def self.(echunk, eslice) self.(self.dec36(echunk), self.dec36(eslice)) end |
.enc36(i) ⇒ Object
65 66 67 |
# File 'lib/balsamique.rb', line 65 def self.enc36(i) i.to_s(36) end |
.enc36_slice_timestamp(ts) ⇒ Object
71 72 73 |
# File 'lib/balsamique.rb', line 71 def self.(ts) self.(ts).map { |i| self.enc36(i) } end |
.next_task(tasks) ⇒ Object
42 43 44 45 |
# File 'lib/balsamique.rb', line 42 def self.next_task(tasks) item = tasks.find { |t| t.size == 1 } item && item.first end |
.slice_timestamp(ts) ⇒ Object
61 62 63 64 |
# File 'lib/balsamique.rb', line 61 def self.(ts) slice = ts.to_i / STATS_SLICE return slice / STATS_CHUNK, slice % STATS_CHUNK end |
.strip_prefix(str, prefix) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/balsamique.rb', line 52 def self.strip_prefix(str, prefix) s = prefix.size if str[0,s] == prefix str[s, str.size - s] end end |
Instance Method Details
#accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object
289 290 291 292 |
# File 'lib/balsamique.rb', line 289 def accelerate_retries(queue, incr = 0.001, = Time.now.to_i) redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES, [@que_prefix + queue], [incr, ]) end |
#complete_report(id) ⇒ Object
532 533 534 535 |
# File 'lib/balsamique.rb', line 532 def complete_report(id) redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE, [@report_queue, @retries], [id]) end |
#decode_job_status(status) ⇒ Object
393 394 395 396 397 398 399 400 401 402 |
# File 'lib/balsamique.rb', line 393 def decode_job_status(status) queue, ts, *retries = status.split(',') ts = ts.to_f = [ts] while retries.size > 0 i = retries.shift.to_i [i] = retries.shift.to_f end return queue, end |
#delete_queue(queue) ⇒ Object
329 330 331 332 333 334 335 |
# File 'lib/balsamique.rb', line 329 def delete_queue(queue) queue_key = @que_prefix + queue.to_s redis.multi do |r| r.del(queue_key) r.hdel(@queues, queue_key) end.last == 1 end |
#dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/balsamique.rb', line 171 def dequeue(tasks, retry_delay = rand_delay, = Time.now.to_f) stats_chunk, stats_slice = self.class.() questats_key = @questats_prefix + stats_chunk keys = [@args, @tasks, questats_key, @retries] tasks.each { |task| keys << @que_prefix + task.to_s } result = redis_eval( DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys, [, retry_delay, stats_slice]) if result id, args, tasks, retries = result { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks), retries: retries } end end |
#enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/balsamique.rb', line 128 def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) next_task = self.class.next_task(tasks) return false, nil unless next_task queue_key = @que_prefix + next_task.to_s keys = [@tasks, @args, @status, queue_key, @queues, @unique] argv = [tasks.to_json, args.to_json, run_at] argv << uniq_in_flight if uniq_in_flight result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv) return result_id > 0, result_id.abs.to_s end |
#fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object
269 270 271 272 273 |
# File 'lib/balsamique.rb', line 269 def fail(id, task, details, = Time.now.to_f) keys = [@tasks, @status, @retries, @failz, @failures, @report_queue] argv = [id, , @que_prefix + task, JSON.generate(details)] id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv) end |
#failures(*args) ⇒ Object
325 326 327 |
# File 'lib/balsamique.rb', line 325 def failures(*args) get_failures(get_failz(*args)) end |
#fill_args_tasks(statuses) ⇒ Object
465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/balsamique.rb', line 465 def fill_args_tasks(statuses) ids = statuses.keys args, tasks = redis.multi do |r| r.hmget(@args, ids) r.hmget(@tasks, ids) end ids.zip(args, tasks).each do |id, a, t| statuses[id][:args] = a && JSON.parse(a) statuses[id][:tasks] = t && JSON.parse(t) end end |
#fill_job_failures(statuses) ⇒ Object
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/balsamique.rb', line 448 def fill_job_failures(statuses) failz = {} statuses.each do |id, status| next unless (task = status[:task]) = status[:timestamps] next unless .size > 1 queue = @que_prefix + task .drop(1).each_with_index do |ts, i| failz["#{id},#{queue},#{i+1}"] = ts end end get_failures(failz).each do |id, failures| statuses[id][:failures] = failures end statuses end |
#get_env(topic, keys = nil) ⇒ Object
554 555 556 557 558 559 560 561 562 563 564 565 566 |
# File 'lib/balsamique.rb', line 554 def get_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.hgetall(hkey) elsif keys.empty? {} else result = {} values = redis.hmget(hkey, keys) keys.zip(values).each { |k, v| result[k] = v } result end end |
#get_failures(failz) ⇒ Object
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/balsamique.rb', line 294 def get_failures(failz) result = Hash.new { Array.new } fkeys = failz.keys if fkeys.size > 0 failures = redis.hmget(@failures, fkeys) fkeys.zip(failures).each do |key, details| id, queue, r = key.split(',') r = r.to_i task = self.class.strip_prefix(queue, @que_prefix) result[id] <<= { task: task, retries: r, ts: failz[key], details: details && JSON.parse(details) } end end result end |
#get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/balsamique.rb', line 311 def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100) values = if limit < 0 redis.zrevrangebyscore( @failz, latest, earliest, limit: [0, -limit], with_scores: true) else redis.zrangebyscore( @failz, earliest, latest, limit: [0, limit], with_scores: true) end result = {} values.each { |v| result[v[0]] = v[1] } result end |
#job_status(*ids) ⇒ Object
435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/balsamique.rb', line 435 def job_status(*ids) statuses = redis.hmget(@status, *ids) result = {} ids.zip(statuses).each do |(id, status)| next unless status queue, = decode_job_status(status) result[id] = { task: self.class.strip_prefix(queue, @que_prefix), timestamps: } end result end |
#pop_report(timestamp = Time.now.to_f) ⇒ Object
519 520 521 522 523 524 |
# File 'lib/balsamique.rb', line 519 def pop_report( = Time.now.to_f) result = redis_eval( REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries], [, REPORT_RETRY_DELAY]) result &&= JSON.parse(result) end |
#push_report(id, timestamp = Time.now.to_f) ⇒ Object
497 498 499 500 501 502 |
# File 'lib/balsamique.rb', line 497 def push_report(id, = Time.now.to_f) redis.multi do |r| r.hdel(@retries, "#{id},#{@report_queue}") r.zadd(@report_queue, , id) end end |
#put_env(topic, h) ⇒ Object
537 538 539 540 541 542 543 |
# File 'lib/balsamique.rb', line 537 def put_env(topic, h) return if h.empty? kvs = [] h.each { |k, v| kvs << k << v } hkey = @env_prefix + topic.to_s 'OK' == redis.hmset(hkey, *kvs) end |
#queue_length(queue) ⇒ Object
374 375 376 |
# File 'lib/balsamique.rb', line 374 def queue_length(queue) redis.zcard(@que_prefix + queue) || 0 end |
#queue_peek(queue, n = 1000) ⇒ Object
378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/balsamique.rb', line 378 def queue_peek(queue, n = 1000) qkey = @que_prefix + queue result = {} redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item| result[item[0]] = { ts: item[1] } end return result if result.empty? retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" }) result.keys.zip(retries).each do |item| id, r = item result[id][:retries] = r.to_i end result end |
#queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/balsamique.rb', line 477 def queue_stats(chunks = 3, latest = Time.now.to_f) last_chunk, last_slice = self.class.(latest) stats = {} (0..(chunks - 1)).each do |chunk_i| chunk_ts = self.class.enc36(last_chunk - chunk_i) questats_key = @questats_prefix + chunk_ts stats_chunk = redis.hgetall(questats_key) next unless stats_chunk stats_chunk.each do |key, val| queue, stat, slice = key.split(',') queue = self.class.strip_prefix(queue, @que_prefix) = self.class.(chunk_ts, slice) stats[stat] = {} unless stats[stat] stats[stat][] = {} unless stats[stat][] stats[stat][][queue] = val.to_i end end stats end |
#queues ⇒ Object
337 338 339 340 |
# File 'lib/balsamique.rb', line 337 def queues result = redis.hgetall(@queues) result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) } end |
#queues_info ⇒ Object
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/balsamique.rb', line 342 def queues_info qs_info = redis.hgetall(@queues) return {} if qs_info.empty? now = Time.now.to_f details = redis.multi do |r| qs_info.keys.each do |key| r.zrange(key, 0, 0, withscores: true) r.zcount(key, 0, now) r.zcard(key) end end result = {} qs_info.keys.each_with_index do |key, i| i3 = 3 * i queue = self.class.strip_prefix(key, @que_prefix) last_id, last_ts = qs_info[key].split(',') last_ts = last_ts.to_f next_id = next_ts = nil if (next_info = details[i3].first) next_id = next_info.first next_ts = next_info.last end result[queue] = { current_ts: now, last_id: last_id, last_ts: last_ts, total: details[i3 + 2], ready: details[i3 + 1], next_id: next_id, next_ts: next_ts } end result end |
#rand_delay(delay = RETRY_DELAY) ⇒ Object
27 28 29 |
# File 'lib/balsamique.rb', line 27 def rand_delay(delay = RETRY_DELAY) delay - 0.5 * rand() * delay end |
#redis ⇒ Object
31 32 33 |
# File 'lib/balsamique.rb', line 31 def redis @redis end |
#redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/balsamique.rb', line 35 def redis_eval(cmd_sha, cmd, keys, argv) redis.evalsha(cmd_sha, keys, argv) rescue Redis::CommandError puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}" redis.eval(cmd, keys, argv) end |
#remove_job(id) ⇒ Object
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/balsamique.rb', line 404 def remove_job(id) return unless (status = redis.hget(@status, id)) queue, = decode_job_status(status) redis.multi do |r| if queue.start_with?(@que_prefix) r.zrem(queue, id) rkey = "#{id},#{queue}" r.hdel(@retries, rkey) rkeys = [] .drop(1).each_with_index do |ts, i| rkeys << rkey + ",#{i + 1}" end if rkeys.size > 0 r.hdel(@failures, rkeys) r.zrem(@failz, rkeys) end end r.hdel(@args, id) r.hdel(@tasks, id) end return unless (check_status = redis.hget(@status, id)) if check_status == status redis.hdel(@status, id) if (uid = redis.hget(@unique, id)) redis.hdel(@unique, [id, uid]) end else remove_job(id) end end |
#rm_env(topic, keys = nil) ⇒ Object
545 546 547 548 549 550 551 552 |
# File 'lib/balsamique.rb', line 545 def rm_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.del(hkey) elsif !keys.empty? redis.hdel(hkey, keys) end end |
#set_id_floor(id_floor) ⇒ Object
89 90 91 |
# File 'lib/balsamique.rb', line 89 def set_id_floor(id_floor) redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i]) end |
#succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object
228 229 230 231 232 233 234 235 236 237 |
# File 'lib/balsamique.rb', line 228 def succeed(id, tasks, = Time.now.to_f) current_task = self.class.current_task(tasks) next_task = self.class.next_task(tasks) keys = [ @tasks, @status, @retries, @report_queue, @failz, @failures, @que_prefix + current_task] argv = [id, , tasks.to_json] keys << (@que_prefix + next_task) << @queues if next_task id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv) end |