Module: Disc::Job::ClassMethods
- Defined in:
- lib/disc/job.rb,
lib/disc/testing.rb
Instance Method Summary collapse
- #[](disque_id) ⇒ Object
- #disc(queue: nil, **options) ⇒ Object
- #disc_options ⇒ Object
- #disque ⇒ Object
- #disque=(disque) ⇒ Object
-
#enqueue(args = [], at: nil, queue: nil, **options) ⇒ Object
Disc’s ‘#enqueue` is the main user-facing method of a Disc job, it enqueues a job with a given set of arguments in Disque, so it can be picked up by a Disc worker process.
- #mocked_queue ⇒ Object
- #perform(arguments) ⇒ Object
- #queue ⇒ Object
Instance Method Details
#[](disque_id) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/disc/job.rb', line 20 def [](disque_id) job_data = disque.call("SHOW", disque_id) return nil if job_data.nil? job = self.new job_data = Hash[*job_data] job.disque_id = disque_id job.arguments = Disc.deserialize(job_data.fetch('body')).fetch('arguments') return job end |
#disc(queue: nil, **options) ⇒ Object
41 42 43 44 |
# File 'lib/disc/job.rb', line 41 def disc(queue: nil, **) @queue = queue = end |
#disc_options ⇒ Object
46 47 48 |
# File 'lib/disc/job.rb', line 46 def ||= {} end |
#disque ⇒ Object
33 34 35 |
# File 'lib/disc/job.rb', line 33 def disque defined?(@disque) ? @disque : Disc.disque end |
#disque=(disque) ⇒ Object
37 38 39 |
# File 'lib/disc/job.rb', line 37 def disque=(disque) @disque = disque end |
#enqueue(args = [], at: nil, queue: nil, **options) ⇒ Object
Disc’s ‘#enqueue` is the main user-facing method of a Disc job, it
enqueues a job with a given set of arguments in Disque, so it can be
picked up by a Disc worker process.
Parameters:
‘arguments` - an optional array of arguments with which to execute
the job's #perform method.
‘at` - an optional named parameter specifying a moment in the
future in which to run the job, must respond to
`#to_time`.
‘queue` - an optional named parameter specifying the name of the
queue in which to store the job, defaults to the class
Disc queue or to 'default' if no Disc queue is specified
in the class.
`**options` - an optional hash of to forward internally to
[disque-rb](https://github.com/soveran/disque-rb)'s
`#push` method, valid options are:
`replicate: <count>` - specifies the number of nodes the job should
be replicated to.
‘delay: <sec>` - specifies a delay time in seconds for the job
to be delivered to a Disc worker, it is ignored
if using the `at` parameter.
‘ttl: <sec>` - specifies the job’s time to live in seconds:
after this time, the job is deleted even if
it was not successfully delivered. If not
specified, the default TTL is one day.
‘maxlen: <count>` - specifies that if there are already <count>
queued for the specified queue name,
the is refused.
‘async: true` - asks the server to let the command return ASAP
and replicate the job to other nodes in the background.
CAVEATS
For convenience, any object can be passed as the ‘arguments` parameter,
`Array.wrap` will be used internally to preserve the array structure.
The ‘arguments` parameter is serialized for storage using `Disc.serialize`
and Disc workers picking it up use `Disc.deserialize` on it, both methods
use standard library json but can be overriden by the user
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/disc/job.rb', line 109 def enqueue(args = [], at: nil, queue: nil, **) = .merge().tap do |opt| opt[:delay] = at.to_time.to_i - DateTime.now.to_time.to_i unless at.nil? end disque_id = disque.push( queue || self.queue, Disc.serialize({ class: self.name, arguments: Array(args) }), Disc.disque_timeout, ) return self[disque_id] end |
#mocked_queue ⇒ Object
2 3 4 |
# File 'lib/disc/testing.rb', line 2 def mocked_queue @_mocked_queue ||= [] end |
#perform(arguments) ⇒ Object
54 55 56 |
# File 'lib/disc/job.rb', line 54 def perform(arguments) self.new.perform(*arguments) end |
#queue ⇒ Object
50 51 52 |
# File 'lib/disc/job.rb', line 50 def queue @queue || Disc.default_queue end |