Class: Faktory::Job::Setter
- Inherits:
-
Object
- Object
- Faktory::Job::Setter
- Defined in:
- lib/faktory/job.rb
Overview
This helper class encapsulates the set options for ‘set`, e.g.
SomeJob.set(queue: 'foo').perform_async(....)
Instance Method Summary collapse
-
#client_push(item) ⇒ Object
:nodoc:.
-
#initialize(opts) ⇒ Setter
constructor
A new instance of Setter.
- #perform_async(*args) ⇒ Object
-
#perform_in(interval, *args) ⇒ Object
(also: #perform_at)
intervalmust be a timestamp, numeric or something that acts numeric (like an activesupport time interval).
Constructor Details
#initialize(opts) ⇒ Setter
55 56 57 |
# File 'lib/faktory/job.rb', line 55 def initialize(opts) @opts = opts end |
Instance Method Details
#client_push(item) ⇒ Object
:nodoc:
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/faktory/job.rb', line 80 def client_push(item) # :nodoc: # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end item["jid"] ||= SecureRandom.hex(12) item["queue"] ||= "default" pool = Thread.current[:faktory_via_pool] || item["pool"] || Faktory.server_pool item.delete("pool") # the payload hash is shallow copied by `merge` calls BUT we don't deep clone # the 'custom' child hash which can be problematic if we mutate it within middleware. # Proactively dup it first. item["custom"] = item["custom"].dup if item["custom"] Faktory.client_middleware.invoke(item, pool) do pool.with do |c| c.push(item) end end end |
#perform_async(*args) ⇒ Object
59 60 61 |
# File 'lib/faktory/job.rb', line 59 def perform_async(*args) client_push(@opts.merge('args'.freeze => args)) end |
#perform_in(interval, *args) ⇒ Object Also known as: perform_at
interval must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/faktory/job.rb', line 65 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) at = Time.at(ts).utc.to_datetime.rfc3339(9) item = @opts.merge('args'.freeze => args, 'at'.freeze => at) # Optimization to enqueue something now that is scheduled to go out now or in the past item.delete('at'.freeze) if ts <= now client_push(item) end |