Class: Sidekiq::Worker::Setter

Inherits:
Object
  • Object
show all
Includes:
JobUtil
Defined in:
lib/sidekiq/worker.rb

Overview

This helper class encapsulates the set options for ‘set`, e.g.

SomeWorker.set(queue: 'foo').perform_async(....)

Instance Method Summary collapse

Methods included from JobUtil

#normalize_item, #normalized_hash, #validate

Constructor Details

#initialize(klass, opts) ⇒ Setter

Returns a new instance of Setter.



176
177
178
179
180
181
182
183
# File 'lib/sidekiq/worker.rb', line 176

def initialize(klass, opts)
  @klass = klass
  @opts = opts

  # ActiveJob compatibility
  interval = @opts.delete(:wait_until) || @opts.delete(:wait)
  at(interval) if interval
end

Instance Method Details

#perform_async(*args) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/sidekiq/worker.rb', line 192

def perform_async(*args)
  if @opts["sync"] == true
    perform_inline(*args)
  else
    @klass.client_push(@opts.merge("args" => args, "class" => @klass))
  end
end

#perform_bulk(args, batch_size: 1_000) ⇒ Object



237
238
239
240
241
242
243
244
# File 'lib/sidekiq/worker.rb', line 237

def perform_bulk(args, batch_size: 1_000)
  hash = @opts.transform_keys(&:to_s)
  result = args.each_slice(batch_size).flat_map do |slice|
    Sidekiq::Client.push_bulk(hash.merge("class" => @klass, "args" => slice))
  end

  result.is_a?(Enumerator::Lazy) ? result.force : result
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


248
249
250
# File 'lib/sidekiq/worker.rb', line 248

def perform_in(interval, *args)
  at(interval).perform_async(*args)
end

#perform_inline(*args) ⇒ Object Also known as: perform_sync

Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/sidekiq/worker.rb', line 202

def perform_inline(*args)
  raw = @opts.merge("args" => args, "class" => @klass).transform_keys(&:to_s)

  # validate and normalize payload
  item = normalize_item(raw)
  queue = item["queue"]

  # run client-side middleware
  result = Sidekiq.client_middleware.invoke(item["class"], item, queue, Sidekiq.redis_pool) do
    item
  end
  return nil unless result

  # round-trip the payload via JSON
  msg = Sidekiq.load_json(Sidekiq.dump_json(item))

  # prepare the job instance
  klass = msg["class"].constantize
  job = klass.new
  job.jid = msg["jid"]
  job.bid = msg["bid"] if job.respond_to?(:bid)

  # run the job through server-side middleware
  result = Sidekiq.server_middleware.invoke(job, msg, msg["queue"]) do
    # perform it
    job.perform(*msg["args"])
    true
  end
  return nil unless result
  # jobs do not return a result. they should store any
  # modified state.
  true
end

#set(options) ⇒ Object



185
186
187
188
189
190
# File 'lib/sidekiq/worker.rb', line 185

def set(options)
  interval = options.delete(:wait_until) || options.delete(:wait)
  @opts.merge!(options)
  at(interval) if interval
  self
end