Class: Sidekiq::Scheduled::Enq

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/scheduled.rb

Instance Method Summary collapse

Instance Method Details

#enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/sidekiq/scheduled.rb', line 12

def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get next items in the queue with scores (time to execute) <= now.
      until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty?
        # We need to go through the list one at a time to reduce the risk of something
        # going wrong between the time jobs are popped from the scheduled queue and when
        # they are pushed onto a work queue and losing the jobs.
        jobs.each do |job|
          # Pop item off the queue and add it to the work queue. If the job can't be popped from
          # the queue, it's because another process already popped it so we can move on to the
          # next one.
          if conn.zrem(sorted_set, job)
            Sidekiq::Client.push(Sidekiq.load_json(job))
            Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
          end
        end
      end
    end
  end
end