Module: Sidekiq::Worker::ClassMethods

Defined in:
lib/sidekiq/testing.rb,
lib/sidekiq/worker.rb

Overview

The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'sidekiq/testing'

assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size

You can also clear and drain all workers’ jobs:

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard

assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size

Sidekiq::Worker.clear_all # or .drain_all

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

This can be useful to make sure jobs don’t linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Sidekiq::Worker.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Sidekiq::Worker.drain_all
end

When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"

Instance Method Summary collapse

Instance Method Details

#clearObject

Clear all jobs for this worker



269
270
271
# File 'lib/sidekiq/testing.rb', line 269

def clear
  Queues.clear_for(queue, to_s)
end

#client_push(item) ⇒ Object

:nodoc:



236
237
238
239
240
241
242
243
244
# File 'lib/sidekiq/worker.rb', line 236

def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end

  Sidekiq::Client.new(pool).push(item)
end

#delay(*args) ⇒ Object

Raises:

  • (ArgumentError)


183
184
185
# File 'lib/sidekiq/worker.rb', line 183

def delay(*args)
  raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end

#delay_for(*args) ⇒ Object

Raises:

  • (ArgumentError)


187
188
189
# File 'lib/sidekiq/worker.rb', line 187

def delay_for(*args)
  raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end

#delay_until(*args) ⇒ Object

Raises:

  • (ArgumentError)


191
192
193
# File 'lib/sidekiq/worker.rb', line 191

def delay_until(*args)
  raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end

#drainObject

Drain and run all jobs for this worker



274
275
276
277
278
279
280
# File 'lib/sidekiq/testing.rb', line 274

def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], to_s)
    process_job(next_job)
  end
end

#execute_job(worker, args) ⇒ Object



299
300
301
# File 'lib/sidekiq/testing.rb', line 299

def execute_job(worker, args)
  worker.perform(*args)
end

#jobsObject

Jobs queued for this worker



264
265
266
# File 'lib/sidekiq/testing.rb', line 264

def jobs
  Queues.jobs_by_worker[to_s]
end

#perform_async(*args) ⇒ Object



199
200
201
# File 'lib/sidekiq/worker.rb', line 199

def perform_async(*args)
  client_push("class" => self, "args" => args)
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/sidekiq/worker.rb', line 205

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)

  item = {"class" => self, "args" => args}

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item["at"] = ts if ts > now

  client_push(item)
end

#perform_oneObject

Pop out a single job and perform it

Raises:



283
284
285
286
287
288
# File 'lib/sidekiq/testing.rb', line 283

def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], queue, to_s)
  process_job(next_job)
end

#process_job(job) ⇒ Object



290
291
292
293
294
295
296
297
# File 'lib/sidekiq/testing.rb', line 290

def process_job(job)
  worker = new
  worker.jid = job["jid"]
  worker.bid = job["bid"] if worker.respond_to?(:bid=)
  Sidekiq::Testing.server_middleware.invoke(worker, job, job["queue"]) do
    execute_job(worker, job["args"])
  end
end

#queueObject

Queue for this worker



259
260
261
# File 'lib/sidekiq/testing.rb', line 259

def queue
  get_sidekiq_options["queue"]
end

#set(options) ⇒ Object



195
196
197
# File 'lib/sidekiq/worker.rb', line 195

def set(options)
  Setter.new(self, options)
end

#sidekiq_options(opts = {}) ⇒ Object

Allows customization for this type of Worker. Legal options:

queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
   or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
   can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.

In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.



232
233
234
# File 'lib/sidekiq/worker.rb', line 232

def sidekiq_options(opts = {})
  super
end