Class: Async::Job::Processor::Redis::DelayedJobs
- Inherits:
 - 
      Object
      
        
- Object
 - Async::Job::Processor::Redis::DelayedJobs
 
 
- Defined in:
 - lib/async/job/processor/redis/delayed_jobs.rb
 
Constant Summary collapse
- ADD =
 <<~LUA redis.call('HSET', KEYS[1], ARGV[1], ARGV[2]) redis.call('ZADD', KEYS[2], ARGV[3], ARGV[1]) LUA
- MOVE =
 <<~LUA local jobs = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1]) redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1]) if #jobs > 0 then redis.call('LPUSH', KEYS[2], unpack(jobs)) end return #jobs LUA
Instance Attribute Summary collapse
- 
  
    
      #key  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Returns the value of attribute key.
 
Instance Method Summary collapse
- #add(job, timestamp, job_store) ⇒ Object
 - 
  
    
      #initialize(client, key)  ⇒ DelayedJobs 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of DelayedJobs.
 - #move(destination:, now: Time.now.to_i) ⇒ Object
 - #start(ready_list, resolution: 10, parent: Async::Task.current) ⇒ Object
 
Constructor Details
#initialize(client, key) ⇒ DelayedJobs
Returns a new instance of DelayedJobs.
      25 26 27 28 29 30 31  | 
    
      # File 'lib/async/job/processor/redis/delayed_jobs.rb', line 25 def initialize(client, key) @client = client @key = key @add = @client.script(:load, ADD) @move = @client.script(:load, MOVE) end  | 
  
Instance Attribute Details
#key ⇒ Object (readonly)
Returns the value of attribute key.
      47 48 49  | 
    
      # File 'lib/async/job/processor/redis/delayed_jobs.rb', line 47 def key @key end  | 
  
Instance Method Details
#add(job, timestamp, job_store) ⇒ Object
      49 50 51 52 53 54 55  | 
    
      # File 'lib/async/job/processor/redis/delayed_jobs.rb', line 49 def add(job, , job_store) id = SecureRandom.uuid @client.evalsha(@add, 2, job_store.key, @key, id, job, .to_f) return id end  | 
  
#move(destination:, now: Time.now.to_i) ⇒ Object
      57 58 59  | 
    
      # File 'lib/async/job/processor/redis/delayed_jobs.rb', line 57 def move(destination:, now: Time.now.to_i) @client.evalsha(@move, 2, @key, destination, now) end  | 
  
#start(ready_list, resolution: 10, parent: Async::Task.current) ⇒ Object
      33 34 35 36 37 38 39 40 41 42 43 44 45  | 
    
      # File 'lib/async/job/processor/redis/delayed_jobs.rb', line 33 def start(ready_list, resolution: 10, parent: Async::Task.current) parent.async do while true count = move(destination: ready_list.key) if count > 0 Console.debug(self, "Moved #{count} delayed jobs to ready list.") end sleep(resolution) end end end  |