Class: Sidekiq::Job::Setter

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::JobUtil
Defined in:
lib/sidekiq/job.rb

Overview

This helper class encapsulates the set options for ‘set`, e.g.

SomeJob.set(queue: 'foo').perform_async(....)

Constant Summary

Constants included from Sidekiq::JobUtil

Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES

Instance Method Summary collapse

Methods included from Sidekiq::JobUtil

#normalize_item, #normalized_hash, #validate, #verify_json

Constructor Details

#initialize(klass, opts) ⇒ Setter

Returns a new instance of Setter.



176
177
178
179
180
181
182
183
184
# File 'lib/sidekiq/job.rb', line 176

def initialize(klass, opts)
  @klass = klass
  # NB: the internal hash always has stringified keys
  @opts = opts.transform_keys(&:to_s)

  # ActiveJob compatibility
  interval = @opts.delete("wait_until") || @opts.delete("wait")
  at(interval) if interval
end

Instance Method Details

#perform_async(*args) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/sidekiq/job.rb', line 194

def perform_async(*args)
  if @opts["sync"] == true
    perform_inline(*args)
  else
    @klass.client_push(@opts.merge("args" => args, "class" => @klass))
  end
end

#perform_bulk(args, batch_size: 1_000) ⇒ Object



240
241
242
243
# File 'lib/sidekiq/job.rb', line 240

def perform_bulk(args, batch_size: 1_000)
  client = @klass.build_client
  client.push_bulk(@opts.merge("class" => @klass, "args" => args, :batch_size => batch_size))
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


247
248
249
# File 'lib/sidekiq/job.rb', line 247

def perform_in(interval, *args)
  at(interval).perform_async(*args)
end

#perform_inline(*args) ⇒ Object Also known as: perform_sync

Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/sidekiq/job.rb', line 204

def perform_inline(*args)
  raw = @opts.merge("args" => args, "class" => @klass)

  # validate and normalize payload
  item = normalize_item(raw)
  queue = item["queue"]

  # run client-side middleware
  cfg = Sidekiq.default_configuration
  result = cfg.client_middleware.invoke(item["class"], item, queue, cfg.redis_pool) do
    item
  end
  return nil unless result

  # round-trip the payload via JSON
  msg = Sidekiq.load_json(Sidekiq.dump_json(item))

  # prepare the job instance
  klass = Object.const_get(msg["class"])
  job = klass.new
  job.jid = msg["jid"]
  job.bid = msg["bid"] if job.respond_to?(:bid)

  # run the job through server-side middleware
  result = cfg.server_middleware.invoke(job, msg, msg["queue"]) do
    # perform it
    job.perform(*msg["args"])
    true
  end
  return nil unless result
  # jobs do not return a result. they should store any
  # modified state.
  true
end

#set(options) ⇒ Object



186
187
188
189
190
191
192
# File 'lib/sidekiq/job.rb', line 186

def set(options)
  hash = options.transform_keys(&:to_s)
  interval = hash.delete("wait_until") || @opts.delete("wait")
  @opts.merge!(hash)
  at(interval) if interval
  self
end