Class: Async::Job::Processor::Redis::DelayedJobs

Inherits:
Object
  • Object
show all
Defined in:
lib/async/job/processor/redis/delayed_jobs.rb

Constant Summary collapse

ADD =
<<~LUA
	redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
	redis.call('ZADD', KEYS[2], ARGV[3], ARGV[1])
LUA
MOVE =
<<~LUA
	local jobs = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])
	redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
	if #jobs > 0 then
		redis.call('LPUSH', KEYS[2], unpack(jobs))
	end
	return #jobs
LUA

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, key) ⇒ DelayedJobs

Returns a new instance of DelayedJobs.



25
26
27
28
29
30
31
# File 'lib/async/job/processor/redis/delayed_jobs.rb', line 25

def initialize(client, key)
	@client = client
	@key = key
	
	@add = @client.script(:load, ADD)
	@move = @client.script(:load, MOVE)
end

Instance Attribute Details

#keyObject (readonly)

Returns the value of attribute key.



47
48
49
# File 'lib/async/job/processor/redis/delayed_jobs.rb', line 47

def key
  @key
end

Instance Method Details

#add(job, timestamp, job_store) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/async/job/processor/redis/delayed_jobs.rb', line 49

def add(job, timestamp, job_store)
	id = SecureRandom.uuid
	
	@client.evalsha(@add, 2, job_store.key, @key, id, job, timestamp.to_f)
	
	return id
end

#move(destination:, now: Time.now.to_i) ⇒ Object



57
58
59
# File 'lib/async/job/processor/redis/delayed_jobs.rb', line 57

def move(destination:, now: Time.now.to_i)
	@client.evalsha(@move, 2, @key, destination, now)
end

#start(ready_list, resolution: 10, parent: Async::Task.current) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/async/job/processor/redis/delayed_jobs.rb', line 33

def start(ready_list, resolution: 10, parent: Async::Task.current)
	parent.async do
		while true
			count = move(destination: ready_list.key)
			
			if count > 0
				Console.debug(self, "Moved #{count} delayed jobs to ready list.")
			end
			
			sleep(resolution)
		end
	end
end