Module: Disc::Job::ClassMethods

Defined in:
lib/disc/job.rb,
lib/disc/testing.rb

Instance Method Summary collapse

Instance Method Details

#[](disque_id) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/disc/job.rb', line 20

def [](disque_id)
  job_data = disque.call("SHOW", disque_id)
  return nil if job_data.nil?

  job = self.new
  job_data = Hash[*job_data]

  job.disque_id = disque_id
  job.arguments = Disc.deserialize(job_data.fetch('body')).fetch('arguments')

  return job
end

#disc(queue: nil, **options) ⇒ Object



41
42
43
44
# File 'lib/disc/job.rb', line 41

def disc(queue: nil, **options)
  @queue = queue
  @disc_options = options
end

#disc_optionsObject



46
47
48
# File 'lib/disc/job.rb', line 46

def disc_options
  @disc_options ||= {}
end

#disqueObject



33
34
35
# File 'lib/disc/job.rb', line 33

def disque
  defined?(@disque) ? @disque : Disc.disque
end

#disque=(disque) ⇒ Object



37
38
39
# File 'lib/disc/job.rb', line 37

def disque=(disque)
  @disque = disque
end

#enqueue(args = [], at: nil, queue: nil, **options) ⇒ Object

Disc’s ‘#enqueue` is the main user-facing method of a Disc job, it

enqueues a job with a given set of arguments in Disque, so it can be
picked up by a Disc worker process.

Parameters:

‘arguments` - an optional array of arguments with which to execute

the job's #perform method.

‘at` - an optional named parameter specifying a moment in the

future in which to run the job, must respond to
`#to_time`.

‘queue` - an optional named parameter specifying the name of the

               queue in which to store the job, defaults to the class
               Disc queue or to 'default' if no Disc queue is specified
               in the class.

`**options` - an optional hash of options to forward internally to
               [disque-rb](https://github.com/soveran/disque-rb)'s
               `#push` method, valid options are:

`replicate: <count>`  - specifies the number of nodes the job should
                         be replicated to.

‘delay: <sec>` - specifies a delay time in seconds for the job

to be delivered to a Disc worker, it is ignored
if using the `at` parameter.

‘ttl: <sec>` - specifies the job’s time to live in seconds:

after this time, the job is deleted even if
it was not successfully delivered. If not
specified, the default TTL is one day.

‘maxlen: <count>` - specifies that if there are already <count>

messages queued for the specified queue name,
the message is refused.

‘async: true` - asks the server to let the command return ASAP

and replicate the job to other nodes in the background.

CAVEATS

For convenience, any object can be passed as the ‘arguments` parameter,

`Array.wrap` will be used internally to preserve the array structure.

The ‘arguments` parameter is serialized for storage using `Disc.serialize`

and Disc workers picking it up use `Disc.deserialize` on it, both methods
use standard library json but can be overriden by the user


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/disc/job.rb', line 109

def enqueue(args = [], at: nil, queue: nil, **options)
  options = disc_options.merge(options).tap do |opt|
    opt[:delay] = at.to_time.to_i - DateTime.now.to_time.to_i unless at.nil?
  end

  disque_id = disque.push(
    queue || self.queue,
    Disc.serialize({
      class: self.name,
      arguments: Array(args)
    }),
    Disc.disque_timeout,
    options
  )

  return self[disque_id]
end

#mocked_queueObject



2
3
4
# File 'lib/disc/testing.rb', line 2

def mocked_queue
  @_mocked_queue ||= []
end

#perform(arguments) ⇒ Object



54
55
56
# File 'lib/disc/job.rb', line 54

def perform(arguments)
  self.new.perform(*arguments)
end

#queueObject



50
51
52
# File 'lib/disc/job.rb', line 50

def queue
  @queue || Disc.default_queue
end