Class: Balsamique
- Inherits:
-
Object
- Object
- Balsamique
- Defined in:
- lib/balsamique.rb,
lib/balsamique/reporter.rb
Defined Under Namespace
Classes: Reporter
Constant Summary collapse
- REPORT_RETRY_DELAY =
seconds
60.0
- RETRY_DELAY =
seconds
600.0
- STATS_SLICE =
seconds
60
- STATS_CHUNK =
slices (= 3600 seconds = 1 hr)
60
- SET_ID_FLOOR =
<<EOF local floor = tonumber(ARGV[1]) local id = tonumber(redis.call('hget', KEYS[1], '')) if (not id) or floor > id then redis.call('hset', KEYS[1], '', floor) end EOF
- SET_ID_FLOOR_SHA =
Digest::SHA1.hexdigest(SET_ID_FLOOR)
- ENQUEUE_JOB =
Lua script ENQUEUE_JOB takes keys
- tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h
-
and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.
<<EOF local id = redis.call('hincrby', KEYS[6], '', 1) if ARGV[4] then local ukey = 'u:' .. ARGV[4] local uniq = redis.call('hsetnx', KEYS[6], ukey, id) if 0 == uniq then return (- redis.call('hget', KEYS[6], ukey)) else redis.call('hset', KEYS[6], id, ukey) end end redis.call('hset', KEYS[1], id, ARGV[1]) redis.call('hset', KEYS[2], id, ARGV[2]) redis.call('hset', KEYS[3], id, KEYS[4] .. ',' .. ARGV[3]) redis.call('zadd', KEYS[4], ARGV[3], id) redis.call('hset', KEYS[5], KEYS[4], id .. ',' .. ARGV[3]) return id EOF
- ENQUEUE_JOB_SHA =
Digest::SHA1.hexdigest(ENQUEUE_JOB)
- DEQUEUE_TASK =
Lua script DEQUEUE_TASK takes keys
[args_h, tasks_h, questats_h, retries_h, task1_z, ...],
and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.
<<EOF local ts = tonumber(ARGV[1]) local i = 5 while KEYS[i] do local elem = redis.call('zrange', KEYS[i], 0, 0, 'withscores') if elem[2] and tonumber(elem[2]) < ts then local retries = redis.call('hincrby', KEYS[4], elem[1] .. ',' .. KEYS[i], 1) local t_retry = ts + ARGV[2] * 2 ^ retries redis.call('zadd', KEYS[i], t_retry, elem[1]) redis.call('hset', KEYS[3], KEYS[i] .. ',len,' .. ARGV[3], redis.call('zcard', KEYS[i])) redis.call('hincrby', KEYS[3], KEYS[i] .. ',dq,' .. ARGV[3], 1) redis.call('expire', KEYS[3], 90000) return({ elem[1], redis.call('hget', KEYS[1], elem[1]), redis.call('hget', KEYS[2], elem[1]), retries }) end i = i + 1 end EOF
- DEQUEUE_TASK_SHA =
Digest::SHA1.hexdigest(DEQUEUE_TASK)
- SUCCEED_TASK =
<<EOF local id = ARGV[1] local ts = ARGV[2] local tasks = cjson.decode(redis.call('hget', KEYS[1], id)) local cur_task = '' for _, task in ipairs(tasks) do if not task[2] then cur_task = task[1]; break end end if (not (string.sub(KEYS[7], - string.len(cur_task)) == cur_task)) then return redis.error_reply( string.format('task mis-match %s %s %s', id, cur_task, KEYS[7])) end if (redis.call('hdel', KEYS[3], id .. ',' .. KEYS[7]) > 0) then redis.call('zrem', KEYS[7], id) else return redis.error_reply('missing retry count %s %s', id, KEYS[7]) end local status = redis.call('hget', KEYS[2], id) local i = 0 for r in string.gmatch(status, "[^,]+") do i = i + 1 if (i > 2 and i % 2 == 1) then local rkey = id .. ',' .. KEYS[7] .. ',' .. r redis.call('zrem', KEYS[5], rkey) redis.call('hdel', KEYS[6], rkey) end end redis.call('hset', KEYS[1], id, ARGV[3]) redis.call('hdel', KEYS[3], id .. ',' .. KEYS[4]) redis.call('zadd', KEYS[4], ts, id) if (KEYS[8]) then redis.call('hset', KEYS[2], id, KEYS[8] .. ',' .. ts) redis.call('hdel', KEYS[3], id .. ',' .. KEYS[8]) redis.call('zadd', KEYS[8], ts, id) redis.call('hset', KEYS[9], KEYS[8], id .. ',' .. ts) else redis.call('hset', KEYS[2], id, '_' .. ',' .. ts) end return id EOF
- SUCCEED_TASK_SHA =
Digest::SHA1.hexdigest(SUCCEED_TASK)
- FAIL_TASK =
<<EOF local id = ARGV[1] local ts = ARGV[2] local tasks = cjson.decode(redis.call('hget', KEYS[1], id)) local cur_task = '' for _, task in ipairs(tasks) do if not task[2] then cur_task = task[1]; break end end if (not (string.sub(ARGV[3], - string.len(cur_task)) == cur_task)) then return redis.error_reply( string.format('task mismatch %s %s %s', id, cur_task, ARGV[3])) end local rkey = id .. ',' .. ARGV[3] local retries = tonumber(redis.call('hget', KEYS[3], rkey)) if (not retries) then return redis.error_reply( string.format('missing retry count %s %s', id, ARGV[3])) end rkey = rkey .. ',' .. retries redis.call('zadd', KEYS[4], ts, rkey) redis.call('hset', KEYS[5], rkey, ARGV[4]) local status = redis.call('hget', KEYS[2], id) status = status .. ',' .. retries .. ',' .. ts redis.call('hset', KEYS[2], id, status) redis.call('hdel', KEYS[3], id .. ',' .. KEYS[6]) redis.call('zadd', KEYS[6], ts, id) return id EOF
- FAIL_TASK_SHA =
Digest::SHA1.hexdigest(FAIL_TASK)
- ACCELERATE_RETRIES =
<<EOF local ts = tonumber(ARGV[2]) local incr = tonumber(ARGV[1]) local jobs = redis.call('zrangebyscore', KEYS[1], ts, '+inf') local count = 0 for _, job in ipairs(jobs) do redis.call('zadd', KEYS[1], ts, job) count = count + 1 ts = ts + incr end return count EOF
- ACCELERATE_RETRIES_SHA =
Digest::SHA1.hexdigest(ACCELERATE_RETRIES)
- RESCHEDULE =
<<EOF local j = 2 while ARGV[j] do if redis.call('zscore', KEYS[1], ARGV[j]) then redis.call('zadd', KEYS[1], ARGV[1], ARGV[j]) end j = j + 1 end EOF
- RESCHEDULE_SHA =
Digest::SHA1.hexdigest(RESCHEDULE)
- REPORT_POP =
<<EOF local t_pop = tonumber(ARGV[1]) local elem = redis.call('zrange', KEYS[1], 0, 0, 'withscores') local t_elem = tonumber(elem[2]) if (t_elem and t_elem < t_pop) then local retries = redis.call('hincrby', KEYS[2], elem[1] .. ',' .. KEYS[1], 1) local t_retry = t_pop + tonumber(ARGV[2]) * 2 ^ retries redis.call('zadd', KEYS[1], t_retry, elem[1]) elem[3] = retries elem[2] = t_elem return cjson.encode(elem) end EOF
- REPORT_POP_SHA =
Digest::SHA1.hexdigest(REPORT_POP)
- REPORT_COMPLETE =
<<EOF if (redis.call('hdel', KEYS[2], ARGV[1] .. ',' .. KEYS[1]) > 0) then redis.call('zrem', KEYS[1], ARGV[1]) end EOF
- REPORT_COMPLETE_SHA =
Digest::SHA1.hexdigest(REPORT_COMPLETE)
Class Method Summary collapse
- .assemble_timestamp(chunk, slice) ⇒ Object
- .current_task(tasks) ⇒ Object
- .dec36(s) ⇒ Object
- .dec36_assemble_timestamp(echunk, eslice) ⇒ Object
- .enc36(i) ⇒ Object
- .enc36_slice_timestamp(ts) ⇒ Object
- .next_task(tasks) ⇒ Object
- .slice_timestamp(ts) ⇒ Object
- .strip_prefix(str, prefix) ⇒ Object
Instance Method Summary collapse
- #accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object
- #complete_report(id) ⇒ Object
- #decode_job_status(status) ⇒ Object
- #delete_queue(queue) ⇒ Object
- #dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object
- #enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object
- #fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object
- #failures(*args) ⇒ Object
- #fill_args_tasks(statuses) ⇒ Object
- #fill_job_failures(statuses) ⇒ Object
- #get_env(topic, keys = nil) ⇒ Object
- #get_failures(failz) ⇒ Object
- #get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object
-
#initialize(redis, namespace = 'bQ') ⇒ Balsamique
constructor
A new instance of Balsamique.
- #job_status(*ids) ⇒ Object
- #pop_report(timestamp = Time.now.to_f) ⇒ Object
- #push_report(id, timestamp = Time.now.to_f) ⇒ Object
- #put_env(topic, h) ⇒ Object
- #queue_length(queue) ⇒ Object
- #queue_peek(queue, n = 1000) ⇒ Object
- #queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object
- #queues ⇒ Object
- #queues_info ⇒ Object
- #rand_delay(delay = RETRY_DELAY) ⇒ Object
- #redis ⇒ Object
- #redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object
- #remove_job(id) ⇒ Object
- #reschedule(queue, ids, timestamp = Time.now.to_f) ⇒ Object
- #rm_env(topic, keys = nil) ⇒ Object
- #set_id_floor(id_floor) ⇒ Object
- #succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object
Constructor Details
#initialize(redis, namespace = 'bQ') ⇒ Balsamique
Returns a new instance of Balsamique.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/balsamique.rb', line 6 def initialize(redis, namespace = 'bQ') @redis = redis @que_prefix = namespace + ':que:' @questats_prefix = namespace + ':questats:' @env_prefix = namespace + ':env:' @status = namespace + ':status' @queues = namespace + ':queues' @retries = namespace + ':retries' @failures = namespace + ':failures' @failz = namespace + ':failz' @unique = namespace + ':unique' @tasks = namespace + ':tasks' @args = namespace + ':args' @report_queue = @que_prefix + '_report' end |
Class Method Details
.assemble_timestamp(chunk, slice) ⇒ Object
74 75 76 |
# File 'lib/balsamique.rb', line 74 def self.(chunk, slice) (chunk * STATS_CHUNK + slice) * STATS_SLICE end |
.current_task(tasks) ⇒ Object
47 48 49 50 |
# File 'lib/balsamique.rb', line 47 def self.current_task(tasks) item = tasks.reverse.find { |t| t.size > 1 } item && item.first end |
.dec36(s) ⇒ Object
68 69 70 |
# File 'lib/balsamique.rb', line 68 def self.dec36(s) s.to_i(36) end |
.dec36_assemble_timestamp(echunk, eslice) ⇒ Object
77 78 79 |
# File 'lib/balsamique.rb', line 77 def self.(echunk, eslice) self.(self.dec36(echunk), self.dec36(eslice)) end |
.enc36(i) ⇒ Object
65 66 67 |
# File 'lib/balsamique.rb', line 65 def self.enc36(i) i.to_s(36) end |
.enc36_slice_timestamp(ts) ⇒ Object
71 72 73 |
# File 'lib/balsamique.rb', line 71 def self.(ts) self.(ts).map { |i| self.enc36(i) } end |
.next_task(tasks) ⇒ Object
42 43 44 45 |
# File 'lib/balsamique.rb', line 42 def self.next_task(tasks) item = tasks.find { |t| t.size == 1 } item && item.first end |
.slice_timestamp(ts) ⇒ Object
61 62 63 64 |
# File 'lib/balsamique.rb', line 61 def self.(ts) slice = ts.to_i / STATS_SLICE return slice / STATS_CHUNK, slice % STATS_CHUNK end |
.strip_prefix(str, prefix) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/balsamique.rb', line 52 def self.strip_prefix(str, prefix) s = prefix.size if str[0,s] == prefix str[s, str.size - s] end end |
Instance Method Details
#accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) ⇒ Object
289 290 291 292 |
# File 'lib/balsamique.rb', line 289 def accelerate_retries(queue, incr = 0.001, = Time.now.to_i) redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES, [@que_prefix + queue], [incr, ]) end |
#complete_report(id) ⇒ Object
548 549 550 551 |
# File 'lib/balsamique.rb', line 548 def complete_report(id) redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE, [@report_queue, @retries], [id]) end |
#decode_job_status(status) ⇒ Object
409 410 411 412 413 414 415 416 417 418 |
# File 'lib/balsamique.rb', line 409 def decode_job_status(status) queue, ts, *retries = status.split(',') ts = ts.to_f = [ts] while retries.size > 0 i = retries.shift.to_i [i] = retries.shift.to_f end return queue, end |
#delete_queue(queue) ⇒ Object
345 346 347 348 349 350 351 |
# File 'lib/balsamique.rb', line 345 def delete_queue(queue) queue_key = @que_prefix + queue.to_s redis.multi do |r| r.del(queue_key) r.hdel(@queues, queue_key) end.last == 1 end |
#dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/balsamique.rb', line 171 def dequeue(tasks, retry_delay = rand_delay, = Time.now.to_f) stats_chunk, stats_slice = self.class.() questats_key = @questats_prefix + stats_chunk keys = [@args, @tasks, questats_key, @retries] tasks.each { |task| keys << @que_prefix + task.to_s } result = redis_eval( DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys, [, retry_delay, stats_slice]) if result id, args, tasks, retries = result { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks), retries: retries } end end |
#enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) ⇒ Object
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/balsamique.rb', line 128 def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) next_task = self.class.next_task(tasks) return false, nil unless next_task queue_key = @que_prefix + next_task.to_s keys = [@tasks, @args, @status, queue_key, @queues, @unique] argv = [tasks.to_json, args.to_json, run_at] argv << uniq_in_flight if uniq_in_flight result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv) return result_id > 0, result_id.abs.to_s end |
#fail(id, task, details, timestamp = Time.now.to_f) ⇒ Object
269 270 271 272 273 |
# File 'lib/balsamique.rb', line 269 def fail(id, task, details, = Time.now.to_f) keys = [@tasks, @status, @retries, @failz, @failures, @report_queue] argv = [id, , @que_prefix + task, JSON.generate(details)] id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv) end |
#failures(*args) ⇒ Object
341 342 343 |
# File 'lib/balsamique.rb', line 341 def failures(*args) get_failures(get_failz(*args)) end |
#fill_args_tasks(statuses) ⇒ Object
481 482 483 484 485 486 487 488 489 490 491 |
# File 'lib/balsamique.rb', line 481 def fill_args_tasks(statuses) ids = statuses.keys args, tasks = redis.multi do |r| r.hmget(@args, ids) r.hmget(@tasks, ids) end ids.zip(args, tasks).each do |id, a, t| statuses[id][:args] = a && JSON.parse(a) statuses[id][:tasks] = t && JSON.parse(t) end end |
#fill_job_failures(statuses) ⇒ Object
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/balsamique.rb', line 464 def fill_job_failures(statuses) failz = {} statuses.each do |id, status| next unless (task = status[:task]) = status[:timestamps] next unless .size > 1 queue = @que_prefix + task .drop(1).each_with_index do |ts, i| failz["#{id},#{queue},#{i+1}"] = ts end end get_failures(failz).each do |id, failures| statuses[id][:failures] = failures end statuses end |
#get_env(topic, keys = nil) ⇒ Object
570 571 572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/balsamique.rb', line 570 def get_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.hgetall(hkey) elsif keys.empty? {} else result = {} values = redis.hmget(hkey, keys) keys.zip(values).each { |k, v| result[k] = v } result end end |
#get_failures(failz) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/balsamique.rb', line 310 def get_failures(failz) result = Hash.new { Array.new } fkeys = failz.keys if fkeys.size > 0 failures = redis.hmget(@failures, fkeys) fkeys.zip(failures).each do |key, details| id, queue, r = key.split(',') r = r.to_i task = self.class.strip_prefix(queue, @que_prefix) result[id] <<= { task: task, retries: r, ts: failz[key], details: details && JSON.parse(details) } end end result end |
#get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)) ⇒ Object
327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/balsamique.rb', line 327 def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100) values = if limit < 0 redis.zrevrangebyscore( @failz, latest, earliest, limit: [0, -limit], with_scores: true) else redis.zrangebyscore( @failz, earliest, latest, limit: [0, limit], with_scores: true) end result = {} values.each { |v| result[v[0]] = v[1] } result end |
#job_status(*ids) ⇒ Object
451 452 453 454 455 456 457 458 459 460 461 462 |
# File 'lib/balsamique.rb', line 451 def job_status(*ids) statuses = redis.hmget(@status, *ids) result = {} ids.zip(statuses).each do |(id, status)| next unless status queue, = decode_job_status(status) result[id] = { task: self.class.strip_prefix(queue, @que_prefix), timestamps: } end result end |
#pop_report(timestamp = Time.now.to_f) ⇒ Object
535 536 537 538 539 540 |
# File 'lib/balsamique.rb', line 535 def pop_report( = Time.now.to_f) result = redis_eval( REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries], [, REPORT_RETRY_DELAY]) result &&= JSON.parse(result) end |
#push_report(id, timestamp = Time.now.to_f) ⇒ Object
513 514 515 516 517 518 |
# File 'lib/balsamique.rb', line 513 def push_report(id, = Time.now.to_f) redis.multi do |r| r.hdel(@retries, "#{id},#{@report_queue}") r.zadd(@report_queue, , id) end end |
#put_env(topic, h) ⇒ Object
553 554 555 556 557 558 559 |
# File 'lib/balsamique.rb', line 553 def put_env(topic, h) return if h.empty? kvs = [] h.each { |k, v| kvs << k << v } hkey = @env_prefix + topic.to_s 'OK' == redis.hmset(hkey, *kvs) end |
#queue_length(queue) ⇒ Object
390 391 392 |
# File 'lib/balsamique.rb', line 390 def queue_length(queue) redis.zcard(@que_prefix + queue) || 0 end |
#queue_peek(queue, n = 1000) ⇒ Object
394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/balsamique.rb', line 394 def queue_peek(queue, n = 1000) qkey = @que_prefix + queue result = {} redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item| result[item[0]] = { ts: item[1] } end return result if result.empty? retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" }) result.keys.zip(retries).each do |item| id, r = item result[id][:retries] = r.to_i end result end |
#queue_stats(chunks = 3, latest = Time.now.to_f) ⇒ Object
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 |
# File 'lib/balsamique.rb', line 493 def queue_stats(chunks = 3, latest = Time.now.to_f) last_chunk, last_slice = self.class.(latest) stats = {} (0..(chunks - 1)).each do |chunk_i| chunk_ts = self.class.enc36(last_chunk - chunk_i) questats_key = @questats_prefix + chunk_ts stats_chunk = redis.hgetall(questats_key) next unless stats_chunk stats_chunk.each do |key, val| queue, stat, slice = key.split(',') queue = self.class.strip_prefix(queue, @que_prefix) = self.class.(chunk_ts, slice) stats[stat] = {} unless stats[stat] stats[stat][] = {} unless stats[stat][] stats[stat][][queue] = val.to_i end end stats end |
#queues ⇒ Object
353 354 355 356 |
# File 'lib/balsamique.rb', line 353 def queues result = redis.hgetall(@queues) result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) } end |
#queues_info ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/balsamique.rb', line 358 def queues_info qs_info = redis.hgetall(@queues) return {} if qs_info.empty? now = Time.now.to_f details = redis.multi do |r| qs_info.keys.each do |key| r.zrange(key, 0, 0, withscores: true) r.zcount(key, 0, now) r.zcard(key) end end result = {} qs_info.keys.each_with_index do |key, i| i3 = 3 * i queue = self.class.strip_prefix(key, @que_prefix) last_id, last_ts = qs_info[key].split(',') last_ts = last_ts.to_f next_id = next_ts = nil if (next_info = details[i3].first) next_id = next_info.first next_ts = next_info.last end result[queue] = { current_ts: now, last_id: last_id, last_ts: last_ts, total: details[i3 + 2], ready: details[i3 + 1], next_id: next_id, next_ts: next_ts } end result end |
#rand_delay(delay = RETRY_DELAY) ⇒ Object
27 28 29 |
# File 'lib/balsamique.rb', line 27 def rand_delay(delay = RETRY_DELAY) delay - 0.5 * rand() * delay end |
#redis ⇒ Object
31 32 33 |
# File 'lib/balsamique.rb', line 31 def redis @redis end |
#redis_eval(cmd_sha, cmd, keys, argv) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/balsamique.rb', line 35 def redis_eval(cmd_sha, cmd, keys, argv) redis.evalsha(cmd_sha, keys, argv) rescue Redis::CommandError puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}" redis.eval(cmd, keys, argv) end |
#remove_job(id) ⇒ Object
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/balsamique.rb', line 420 def remove_job(id) return unless (status = redis.hget(@status, id)) queue, = decode_job_status(status) redis.multi do |r| if queue.start_with?(@que_prefix) r.zrem(queue, id) rkey = "#{id},#{queue}" r.hdel(@retries, rkey) rkeys = [] .drop(1).each_with_index do |ts, i| rkeys << rkey + ",#{i + 1}" end if rkeys.size > 0 r.hdel(@failures, rkeys) r.zrem(@failz, rkeys) end end r.hdel(@args, id) r.hdel(@tasks, id) end return unless (check_status = redis.hget(@status, id)) if check_status == status redis.hdel(@status, id) if (uid = redis.hget(@unique, id)) redis.hdel(@unique, [id, uid]) end else remove_job(id) end end |
#reschedule(queue, ids, timestamp = Time.now.to_f) ⇒ Object
305 306 307 308 |
# File 'lib/balsamique.rb', line 305 def reschedule(queue, ids, = Time.now.to_f) redis_eval(RESCHEDULE_SHA, RESCHEDULE, [@que_prefix + queue], [] + ids) end |
#rm_env(topic, keys = nil) ⇒ Object
561 562 563 564 565 566 567 568 |
# File 'lib/balsamique.rb', line 561 def rm_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.del(hkey) elsif !keys.empty? redis.hdel(hkey, keys) end end |
#set_id_floor(id_floor) ⇒ Object
89 90 91 |
# File 'lib/balsamique.rb', line 89 def set_id_floor(id_floor) redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i]) end |
#succeed(id, tasks, timestamp = Time.now.to_f) ⇒ Object
228 229 230 231 232 233 234 235 236 237 |
# File 'lib/balsamique.rb', line 228 def succeed(id, tasks, = Time.now.to_f) current_task = self.class.current_task(tasks) next_task = self.class.next_task(tasks) keys = [ @tasks, @status, @retries, @report_queue, @failz, @failures, @que_prefix + current_task] argv = [id, , tasks.to_json] keys << (@que_prefix + next_task) << @queues if next_task id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv) end |