Class: Faktory::Job::Setter

Inherits:
Object
  • Object
show all
Defined in:
lib/faktory/job.rb

Overview

This helper class encapsulates the set options for ‘set`, e.g.

SomeJob.set(queue: 'foo').perform_async(....)

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Setter



55
56
57
# File 'lib/faktory/job.rb', line 55

def initialize(opts)
  @opts = opts
end

Instance Method Details

#client_push(item) ⇒ Object

:nodoc:



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/faktory/job.rb', line 80

def client_push(item) # :nodoc:
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end
  item["jid"] ||= SecureRandom.hex(12)
  item["queue"] ||= "default"

  pool = Thread.current[:faktory_via_pool] || item["pool"] || Faktory.server_pool
  item.delete("pool")

  # the payload hash is shallow copied by `merge` calls BUT we don't deep clone
  # the 'custom' child hash which can be problematic if we mutate it within middleware.
  # Proactively dup it first.
  item["custom"] = item["custom"].dup if item["custom"]

  Faktory.client_middleware.invoke(item, pool) do
    pool.with do |c|
      c.push(item)
    end
  end
end

#perform_async(*args) ⇒ Object



59
60
61
# File 'lib/faktory/job.rb', line 59

def perform_async(*args)
  client_push(@opts.merge('args'.freeze => args))
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/faktory/job.rb', line 65

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)
  at = Time.at(ts).utc.to_datetime.rfc3339(9)

  item = @opts.merge('args'.freeze => args, 'at'.freeze => at)

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item.delete('at'.freeze) if ts <= now

  client_push(item)
end