Class: Pallets::Backends::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/pallets/backends/redis.rb

Constant Summary collapse

QUEUE_KEY =
'queue'
RELIABILITY_QUEUE_KEY =
'reliability-queue'
RELIABILITY_SET_KEY =
'reliability-set'
RETRY_SET_KEY =
'retry-set'
GIVEN_UP_SET_KEY =
'given-up-set'
WORKFLOW_QUEUE_KEY =
'workflow-queue:%s'
CONTEXT_KEY =
'context:%s'
REMAINING_KEY =
'remaining:%s'

Instance Method Summary collapse

Constructor Details

#initialize(blocking_timeout:, failed_job_lifespan:, job_timeout:, pool_size:, **options) ⇒ Redis

Returns a new instance of Redis.



15
16
17
18
19
20
21
22
# File 'lib/pallets/backends/redis.rb', line 15

def initialize(blocking_timeout:, failed_job_lifespan:, job_timeout:, pool_size:, **options)
  @blocking_timeout = blocking_timeout
  @failed_job_lifespan = failed_job_lifespan
  @job_timeout = job_timeout
  @pool = Pallets::Pool.new(pool_size) { ::Redis.new(options) }

  register_scripts
end

Instance Method Details

#get_context(wfid) ⇒ Object



38
39
40
41
42
# File 'lib/pallets/backends/redis.rb', line 38

def get_context(wfid)
  @pool.execute do |client|
    client.hgetall(CONTEXT_KEY % wfid)
  end
end

#give_up(job, old_job) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/pallets/backends/redis.rb', line 64

def give_up(job, old_job)
  @pool.execute do |client|
    client.evalsha(
      @scripts['give_up'],
      [GIVEN_UP_SET_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY],
      [Time.now.to_f, job, old_job, Time.now.to_f - @failed_job_lifespan]
    )
  end
end

#pickObject



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pallets/backends/redis.rb', line 24

def pick
  @pool.execute do |client|
    job = client.brpoplpush(QUEUE_KEY, RELIABILITY_QUEUE_KEY, timeout: @blocking_timeout)
    if job
      # We store the job's timeout so we know when to retry jobs that are
      # still on the reliability queue. We do this separately since there is
      # no other way to atomically BRPOPLPUSH from the main queue to a
      # sorted set
      client.zadd(RELIABILITY_SET_KEY, Time.now.to_f + @job_timeout, job)
    end
    job
  end
end

#reschedule_all(earlier_than) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/pallets/backends/redis.rb', line 74

def reschedule_all(earlier_than)
  @pool.execute do |client|
    client.evalsha(
      @scripts['reschedule_all'],
      [RELIABILITY_SET_KEY, RELIABILITY_QUEUE_KEY, RETRY_SET_KEY, QUEUE_KEY],
      [earlier_than]
    )
  end
end

#retry(job, old_job, at) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/pallets/backends/redis.rb', line 54

def retry(job, old_job, at)
  @pool.execute do |client|
    client.evalsha(
      @scripts['retry'],
      [RETRY_SET_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY],
      [at, job, old_job]
    )
  end
end

#run_workflow(wfid, jobs_with_order, context_buffer) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/pallets/backends/redis.rb', line 84

def run_workflow(wfid, jobs_with_order, context_buffer)
  @pool.execute do |client|
    client.multi do
      client.evalsha(
        @scripts['run_workflow'],
        [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, REMAINING_KEY % wfid],
        jobs_with_order
      )
      client.hmset(CONTEXT_KEY % wfid, *context_buffer.to_a) unless context_buffer.empty?
    end
  end
end

#save(wfid, job, context_buffer) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/pallets/backends/redis.rb', line 44

def save(wfid, job, context_buffer)
  @pool.execute do |client|
    client.evalsha(
      @scripts['save'],
      [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, CONTEXT_KEY % wfid, REMAINING_KEY % wfid],
      context_buffer.to_a << job
    )
  end
end