Module: Sidekiq::Worker::ClassMethods

Defined in:
lib/sidekiq/testing.rb,
lib/sidekiq/worker.rb

Overview

The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer's :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'sidekiq/testing'

assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size

You can also clear and drain all workers' jobs:

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard

assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size

Sidekiq::Worker.clear_all # or .drain_all

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

This can be useful to make sure jobs don't linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Sidekiq::Worker.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Sidekiq::Worker.drain_all
end

When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"

Instance Method Summary collapse

Instance Method Details

#clearObject

Clear all jobs for this worker


270
271
272
# File 'lib/sidekiq/testing.rb', line 270

def clear
  Queues.clear_for(queue, self.to_s)
end

#client_push(item) ⇒ Object

:nodoc:


135
136
137
138
139
140
141
142
143
# File 'lib/sidekiq/worker.rb', line 135

def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'.freeze] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end

  Sidekiq::Client.new(pool).push(item)
end

#delay(*args) ⇒ Object

Raises:

  • (ArgumentError)

69
70
71
# File 'lib/sidekiq/worker.rb', line 69

def delay(*args)
  raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end

#delay_for(*args) ⇒ Object

Raises:

  • (ArgumentError)

73
74
75
# File 'lib/sidekiq/worker.rb', line 73

def delay_for(*args)
  raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end

#delay_until(*args) ⇒ Object

Raises:

  • (ArgumentError)

77
78
79
# File 'lib/sidekiq/worker.rb', line 77

def delay_until(*args)
  raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end

#drainObject

Drain and run all jobs for this worker


275
276
277
278
279
280
281
# File 'lib/sidekiq/testing.rb', line 275

def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], self.to_s)
    process_job(next_job)
  end
end

#execute_job(worker, args) ⇒ Object


300
301
302
# File 'lib/sidekiq/testing.rb', line 300

def execute_job(worker, args)
  worker.perform(*args)
end

#get_sidekiq_optionsObject

:nodoc:


131
132
133
# File 'lib/sidekiq/worker.rb', line 131

def get_sidekiq_options # :nodoc:
  self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end

#jobsObject

Jobs queued for this worker


265
266
267
# File 'lib/sidekiq/testing.rb', line 265

def jobs
  Queues.jobs_by_worker[self.to_s]
end

#perform_async(*args) ⇒ Object


85
86
87
# File 'lib/sidekiq/worker.rb', line 85

def perform_async(*args)
  client_push('class'.freeze => self, 'args'.freeze => args)
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).

91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/sidekiq/worker.rb', line 91

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)

  item = { 'class'.freeze => self, 'args'.freeze => args, 'at'.freeze => ts }

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item.delete('at'.freeze) if ts <= now

  client_push(item)
end

#perform_oneObject

Pop out a single job and perform it

Raises:


284
285
286
287
288
289
# File 'lib/sidekiq/testing.rb', line 284

def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], queue, self.to_s)
  process_job(next_job)
end

#process_job(job) ⇒ Object


291
292
293
294
295
296
297
298
# File 'lib/sidekiq/testing.rb', line 291

def process_job(job)
  worker = new
  worker.jid = job['jid']
  worker.bid = job['bid'] if worker.respond_to?(:bid=)
  Sidekiq::Testing.server_middleware.invoke(worker, job, job['queue']) do
    execute_job(worker, job['args'])
  end
end

#queueObject

Queue for this worker


260
261
262
# File 'lib/sidekiq/testing.rb', line 260

def queue
  self.sidekiq_options["queue"]
end

#set(options) ⇒ Object


81
82
83
# File 'lib/sidekiq/worker.rb', line 81

def set(options)
  Setter.new(options.merge!('class'.freeze => self))
end

#sidekiq_class_attribute(*attrs) ⇒ Object


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/sidekiq/worker.rb', line 145

def sidekiq_class_attribute(*attrs)
  instance_reader = true
  instance_writer = true

  attrs.each do |name|
    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end
    define_singleton_method(name) { nil }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end
    define_singleton_method("#{name}=") do |val|
      singleton_class.class_eval do
        undef_method(name) if method_defined?(name) || private_method_defined?(name)
        define_method(name) { val }
      end

      if singleton_class?
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    if instance_reader
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
      define_method(name) do
        if instance_variable_defined?(ivar)
          instance_variable_get ivar
        else
          self.class.public_send name
        end
      end
    end

    if instance_writer
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
      attr_writer name
    end
  end
end

#sidekiq_options(opts = {}) ⇒ Object

Allows customization for this type of Worker. Legal options:

queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
   or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
   can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.

In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.


118
119
120
121
# File 'lib/sidekiq/worker.rb', line 118

def sidekiq_options(opts={})
  # stringify
  self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map{|k, v| [k.to_s, v]}])
end

#sidekiq_retries_exhausted(&block) ⇒ Object


127
128
129
# File 'lib/sidekiq/worker.rb', line 127

def sidekiq_retries_exhausted(&block)
  self.sidekiq_retries_exhausted_block = block
end

#sidekiq_retry_in(&block) ⇒ Object


123
124
125
# File 'lib/sidekiq/worker.rb', line 123

def sidekiq_retry_in(&block)
  self.sidekiq_retry_in_block = block
end