Module: Sidekiq::Worker::ClassMethods

Defined in:
lib/sidekiq/testing.rb,
lib/sidekiq/worker.rb

Overview

The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'sidekiq/testing'

assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size

You can also clear and drain all workers’ jobs:

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard

assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size

Sidekiq::Worker.clear_all # or .drain_all

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

This can be useful to make sure jobs don’t linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Sidekiq::Worker.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Sidekiq::Worker.drain_all
end

When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"

Instance Method Summary collapse

Instance Method Details

#clearObject

Clear all jobs for this worker



269
270
271
# File 'lib/sidekiq/testing.rb', line 269

def clear
  Queues.clear_for(queue, to_s)
end

#client_push(item) ⇒ Object

:nodoc:



356
357
358
359
360
361
# File 'lib/sidekiq/worker.rb', line 356

def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
  stringified_item = item.transform_keys(&:to_s)

  Sidekiq::Client.new(pool).push(stringified_item)
end

#delay(*args) ⇒ Object

Raises:

  • (ArgumentError)


266
267
268
# File 'lib/sidekiq/worker.rb', line 266

def delay(*args)
  raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end

#delay_for(*args) ⇒ Object

Raises:

  • (ArgumentError)


270
271
272
# File 'lib/sidekiq/worker.rb', line 270

def delay_for(*args)
  raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end

#delay_until(*args) ⇒ Object

Raises:

  • (ArgumentError)


274
275
276
# File 'lib/sidekiq/worker.rb', line 274

def delay_until(*args)
  raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end

#drainObject

Drain and run all jobs for this worker



274
275
276
277
278
279
280
# File 'lib/sidekiq/testing.rb', line 274

def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], to_s)
    process_job(next_job)
  end
end

#execute_job(worker, args) ⇒ Object



299
300
301
# File 'lib/sidekiq/testing.rb', line 299

def execute_job(worker, args)
  worker.perform(*args)
end

#jobsObject

Jobs queued for this worker



264
265
266
# File 'lib/sidekiq/testing.rb', line 264

def jobs
  Queues.jobs_by_worker[to_s]
end

#perform_async(*args) ⇒ Object



286
287
288
# File 'lib/sidekiq/worker.rb', line 286

def perform_async(*args)
  Setter.new(self, {}).perform_async(*args)
end

#perform_bulk(items, batch_size: 1_000) ⇒ Object

Push a large number of jobs to Redis, while limiting the batch of each job payload to 1,000. This method helps cut down on the number of round trips to Redis, which can increase the performance of enqueueing large numbers of jobs.

items must be an Array of Arrays.

For finer-grained control, use ‘Sidekiq::Client.push_bulk` directly.

Example (3 Redis round trips):

SomeWorker.perform_async(1)
SomeWorker.perform_async(2)
SomeWorker.perform_async(3)

Would instead become (1 Redis round trip):

SomeWorker.perform_bulk([[1], [2], [3]])


315
316
317
318
319
320
321
# File 'lib/sidekiq/worker.rb', line 315

def perform_bulk(items, batch_size: 1_000)
  result = items.each_slice(batch_size).flat_map do |slice|
    Sidekiq::Client.push_bulk("class" => self, "args" => slice)
  end

  result.is_a?(Enumerator::Lazy) ? result.force : result
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/sidekiq/worker.rb', line 325

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)

  item = {"class" => self, "args" => args}

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item["at"] = ts if ts > now

  client_push(item)
end

#perform_inline(*args) ⇒ Object

Inline execution of job’s perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware



291
292
293
# File 'lib/sidekiq/worker.rb', line 291

def perform_inline(*args)
  Setter.new(self, {}).perform_inline(*args)
end

#perform_oneObject

Pop out a single job and perform it

Raises:



283
284
285
286
287
288
# File 'lib/sidekiq/testing.rb', line 283

def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], queue, to_s)
  process_job(next_job)
end

#process_job(job) ⇒ Object



290
291
292
293
294
295
296
297
# File 'lib/sidekiq/testing.rb', line 290

def process_job(job)
  worker = new
  worker.jid = job["jid"]
  worker.bid = job["bid"] if worker.respond_to?(:bid=)
  Sidekiq::Testing.server_middleware.invoke(worker, job, job["queue"]) do
    execute_job(worker, job["args"])
  end
end

#queueObject

Queue for this worker



259
260
261
# File 'lib/sidekiq/testing.rb', line 259

def queue
  get_sidekiq_options["queue"]
end

#queue_as(q) ⇒ Object



278
279
280
# File 'lib/sidekiq/worker.rb', line 278

def queue_as(q)
  sidekiq_options("queue" => q.to_s)
end

#set(options) ⇒ Object



282
283
284
# File 'lib/sidekiq/worker.rb', line 282

def set(options)
  Setter.new(self, options)
end

#sidekiq_options(opts = {}) ⇒ Object

Allows customization for this type of Worker. Legal options:

queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
   or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
   can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.

In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.



352
353
354
# File 'lib/sidekiq/worker.rb', line 352

def sidekiq_options(opts = {})
  super
end