Module: Sidekiq::Worker::ClassMethods
- Defined in:
- lib/sidekiq/testing.rb,
lib/sidekiq/worker.rb
Overview
The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.
This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.
Example:
require 'sidekiq/testing'
assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
You can also clear and drain all workers’ jobs:
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size
Sidekiq::Worker.clear_all # or .drain_all
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
This can be useful to make sure jobs don’t linger between tests:
RSpec.configure do |config|
config.before(:each) do
Sidekiq::Worker.clear_all
end
end
or for acceptance testing, i.e. with cucumber:
AfterStep do
Sidekiq::Worker.drain_all
end
When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"
Instance Method Summary collapse
-
#clear ⇒ Object
Clear all jobs for this worker.
-
#client_push(item) ⇒ Object
:nodoc:.
- #delay(*args) ⇒ Object
- #delay_for(*args) ⇒ Object
- #delay_until(*args) ⇒ Object
-
#drain ⇒ Object
Drain and run all jobs for this worker.
- #execute_job(worker, args) ⇒ Object
-
#jobs ⇒ Object
Jobs queued for this worker.
- #perform_async(*args) ⇒ Object
-
#perform_bulk(items, batch_size: 1_000) ⇒ Object
Push a large number of jobs to Redis, while limiting the batch of each job payload to 1,000.
-
#perform_in(interval, *args) ⇒ Object
(also: #perform_at)
intervalmust be a timestamp, numeric or something that acts numeric (like an activesupport time interval). -
#perform_inline(*args) ⇒ Object
Inline execution of job’s perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware.
-
#perform_one ⇒ Object
Pop out a single job and perform it.
- #process_job(job) ⇒ Object
-
#queue ⇒ Object
Queue for this worker.
- #queue_as(q) ⇒ Object
- #set(options) ⇒ Object
-
#sidekiq_options(opts = {}) ⇒ Object
Allows customization for this type of Worker.
Instance Method Details
#clear ⇒ Object
Clear all jobs for this worker
269 270 271 |
# File 'lib/sidekiq/testing.rb', line 269 def clear Queues.clear_for(queue, to_s) end |
#client_push(item) ⇒ Object
:nodoc:
356 357 358 359 360 361 |
# File 'lib/sidekiq/worker.rb', line 356 def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || ["pool"] || Sidekiq.redis_pool stringified_item = item.transform_keys(&:to_s) Sidekiq::Client.new(pool).push(stringified_item) end |
#delay(*args) ⇒ Object
266 267 268 |
# File 'lib/sidekiq/worker.rb', line 266 def delay(*args) raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async" end |
#delay_for(*args) ⇒ Object
270 271 272 |
# File 'lib/sidekiq/worker.rb', line 270 def delay_for(*args) raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in" end |
#delay_until(*args) ⇒ Object
274 275 276 |
# File 'lib/sidekiq/worker.rb', line 274 def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end |
#drain ⇒ Object
Drain and run all jobs for this worker
274 275 276 277 278 279 280 |
# File 'lib/sidekiq/testing.rb', line 274 def drain while jobs.any? next_job = jobs.first Queues.delete_for(next_job["jid"], next_job["queue"], to_s) process_job(next_job) end end |
#execute_job(worker, args) ⇒ Object
299 300 301 |
# File 'lib/sidekiq/testing.rb', line 299 def execute_job(worker, args) worker.perform(*args) end |
#jobs ⇒ Object
Jobs queued for this worker
264 265 266 |
# File 'lib/sidekiq/testing.rb', line 264 def jobs Queues.jobs_by_worker[to_s] end |
#perform_async(*args) ⇒ Object
286 287 288 |
# File 'lib/sidekiq/worker.rb', line 286 def perform_async(*args) Setter.new(self, {}).perform_async(*args) end |
#perform_bulk(items, batch_size: 1_000) ⇒ Object
Push a large number of jobs to Redis, while limiting the batch of each job payload to 1,000. This method helps cut down on the number of round trips to Redis, which can increase the performance of enqueueing large numbers of jobs.
items must be an Array of Arrays.
For finer-grained control, use ‘Sidekiq::Client.push_bulk` directly.
Example (3 Redis round trips):
SomeWorker.perform_async(1)
SomeWorker.perform_async(2)
SomeWorker.perform_async(3)
Would instead become (1 Redis round trip):
SomeWorker.perform_bulk([[1], [2], [3]])
315 316 317 318 319 320 321 |
# File 'lib/sidekiq/worker.rb', line 315 def perform_bulk(items, batch_size: 1_000) result = items.each_slice(batch_size).flat_map do |slice| Sidekiq::Client.push_bulk("class" => self, "args" => slice) end result.is_a?(Enumerator::Lazy) ? result.force : result end |
#perform_in(interval, *args) ⇒ Object Also known as: perform_at
interval must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/sidekiq/worker.rb', line 325 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) item = {"class" => self, "args" => args} # Optimization to enqueue something now that is scheduled to go out now or in the past item["at"] = ts if ts > now client_push(item) end |
#perform_inline(*args) ⇒ Object
Inline execution of job’s perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware
291 292 293 |
# File 'lib/sidekiq/worker.rb', line 291 def perform_inline(*args) Setter.new(self, {}).perform_inline(*args) end |
#perform_one ⇒ Object
Pop out a single job and perform it
283 284 285 286 287 288 |
# File 'lib/sidekiq/testing.rb', line 283 def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? next_job = jobs.first Queues.delete_for(next_job["jid"], queue, to_s) process_job(next_job) end |
#process_job(job) ⇒ Object
290 291 292 293 294 295 296 297 |
# File 'lib/sidekiq/testing.rb', line 290 def process_job(job) worker = new worker.jid = job["jid"] worker.bid = job["bid"] if worker.respond_to?(:bid=) Sidekiq::Testing.server_middleware.invoke(worker, job, job["queue"]) do execute_job(worker, job["args"]) end end |
#queue ⇒ Object
Queue for this worker
259 260 261 |
# File 'lib/sidekiq/testing.rb', line 259 def queue ["queue"] end |
#queue_as(q) ⇒ Object
278 279 280 |
# File 'lib/sidekiq/worker.rb', line 278 def queue_as(q) ("queue" => q.to_s) end |
#set(options) ⇒ Object
282 283 284 |
# File 'lib/sidekiq/worker.rb', line 282 def set() Setter.new(self, ) end |
#sidekiq_options(opts = {}) ⇒ Object
Allows customization for this type of Worker. Legal options:
queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.
In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.
352 353 354 |
# File 'lib/sidekiq/worker.rb', line 352 def (opts = {}) super end |