Class: SidekiqFastEnq
- Inherits:
-
Object
- Object
- SidekiqFastEnq
- Defined in:
- lib/sidekiq-fast-enq.rb
Overview
Implementation of the Sidekiq::Scheduled::Enq class that uses a server side Lua script to atomically get the next scheduled job to run and then pops it from the list. This works much better in large sidekiq deployments with many processes because it eliminates race conditions checking the scheduled queues.
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
1000
Instance Method Summary collapse
- #enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = nil) ⇒ Object
-
#initialize(batch_size = nil) ⇒ SidekiqFastEnq
constructor
A new instance of SidekiqFastEnq.
Constructor Details
#initialize(batch_size = nil) ⇒ SidekiqFastEnq
Returns a new instance of SidekiqFastEnq.
10 11 12 13 14 15 16 |
# File 'lib/sidekiq-fast-enq.rb', line 10 def initialize(batch_size = nil) batch_size ||= (Sidekiq.[:fast_enq_batch_size] || DEFAULT_BATCH_SIZE) @script = lua_script(batch_size) Sidekiq.redis do |conn| @script_sha_1 = conn.script(:load, @script) end end |
Instance Method Details
#enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = nil) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sidekiq-fast-enq.rb', line 18 def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = nil) sorted_sets ||= Sidekiq::Scheduled::SETS logger = Sidekiq::Logging.logger # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| namespace = conn.namespace if conn.respond_to?(:namespace) sorted_sets.each do |sorted_set| redis_set = (namespace ? "#{namespace}:#{sorted_set}" : sorted_set) jobs_count = 0 start_time = Time.now pop_time = 0.0 enqueue_time = 0.0 # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. loop do t = Time.now job = pop_job(conn, redis_set, now) pop_time += (Time.now - t) break if job.nil? t = Time.now Sidekiq::Client.push(Sidekiq.load_json(job)) enqueue_time += (Time.now - t) jobs_count += 1 logger.debug("enqueued #{sorted_set}: #{job}") if logger && logger.debug? end if jobs_count > 0 && logger && logger.info? loop_time = Time.now - start_time logger.info("SidekiqFastEnq enqueued #{jobs_count} from #{sorted_set} in #{loop_time.round(3)}s (pop: #{pop_time.round(3)}s; enqueue: #{enqueue_time.round(3)}s)") end end end end |