Class: Sidekiq::Scheduled::Enq
- Inherits:
-
Object
- Object
- Sidekiq::Scheduled::Enq
- Defined in:
- lib/sidekiq/scheduled.rb
Constant Summary collapse
- LUA_ZPOPBYSCORE =
<<~LUA local key, now = KEYS[1], ARGV[1] local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1) if jobs[1] then redis.call("zrem", key, jobs[1]) return jobs[1] end LUA
Instance Method Summary collapse
- #enqueue_jobs(sorted_sets = SETS) ⇒ Object
-
#initialize ⇒ Enq
constructor
A new instance of Enq.
- #terminate ⇒ Object
Constructor Details
#initialize ⇒ Enq
Returns a new instance of Enq.
21 22 23 24 |
# File 'lib/sidekiq/scheduled.rb', line 21 def initialize @done = false @lua_zpopbyscore_sha = nil end |
Instance Method Details
#enqueue_jobs(sorted_sets = SETS) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/sidekiq/scheduled.rb', line 26 def enqueue_jobs(sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get next item in the queue with score (time to execute) <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s])) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end |
#terminate ⇒ Object
43 44 45 |
# File 'lib/sidekiq/scheduled.rb', line 43 def terminate @done = true end |