Class: Resque::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/job.rb

Overview

A Resque::Job represents a unit of work. Each job lives on a single queue and has an associated payload object. The payload is a hash with two attributes: ‘class` and `args`. The `class` is the name of the Ruby class which should be used to run the job. The `args` are an array of arguments which should be passed to the Ruby class’s ‘perform` class-level method.

You can manually run a job using this code:

job = Resque::Job.reserve(:high)
klass = Resque::Job.constantize(job.payload['class'])
klass.perform(*job.payload['args'])

Constant Summary collapse

DontPerform =

Raise Resque::Job::DontPerform from a before_perform hook to abort the job.

Class.new(StandardError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, payload) ⇒ Job

Returns a new instance of Job.



134
135
136
137
138
# File 'lib/resque/job.rb', line 134

def initialize(queue, payload)
  @queue = queue
  @payload = payload
  @failure_hooks_ran = false
end

Instance Attribute Details

#payloadObject (readonly)

This job’s associated payload object.



132
133
134
# File 'lib/resque/job.rb', line 132

def payload
  @payload
end

#queueObject (readonly)

The name of the queue from which this job was pulled (or is to be placed)



129
130
131
# File 'lib/resque/job.rb', line 129

def queue
  @queue
end

#workerObject

The worker object which is currently processing this job.



125
126
127
# File 'lib/resque/job.rb', line 125

def worker
  @worker
end

Class Method Details

.create(queue, klass, *args) ⇒ Object

Creates a job by placing it on a queue. Expects a string queue name, a string class name, and an optional array of arguments to pass to the class’ ‘perform` method.

Raises an exception if no queue or class is given.



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/resque/job.rb', line 145

def self.create(queue, klass, *args)
  Resque.validate(klass, queue)

  if Resque.inline?
    # Instantiating a Resque::Job and calling perform on it so callbacks run
    # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
    new(:inline, {'class' => klass, 'args' => decode(encode(args))}).perform
  else
    Resque.push(queue, :class => klass.to_s, :args => args)
  end
end

.decode(object) ⇒ Object

Given a string, returns a Ruby object.



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/resque/job.rb', line 59

def self.decode(object)
  return unless object

  begin
    if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
      MultiJson.load object
    else
      MultiJson.decode object
    end
  rescue ::MultiJson::DecodeError => e
    raise DecodeException, e.message, e.backtrace
  end
end

.destroy(queue, klass, *args) ⇒ Object

Removes a job from a queue. Expects a string queue name, a string class name, and, optionally, args.

Returns the number of jobs destroyed.

If no args are provided, it will remove all jobs of the class provided.

That is, for these two jobs:

{ ‘class’ => ‘UpdateGraph’, ‘args’ => [‘defunkt’] } { ‘class’ => ‘UpdateGraph’, ‘args’ => [‘mojombo’] }

The following call will remove both:

Resque::Job.destroy(queue, 'UpdateGraph')

Whereas specifying args will only remove the 2nd job:

Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')

This method can be potentially very slow and memory intensive, depending on the size of your queue, as it loads all jobs into a Ruby array before processing.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/resque/job.rb', line 181

def self.destroy(queue, klass, *args)
  klass = klass.to_s
  queue = "queue:#{queue}"
  destroyed = 0

  if args.empty?
    redis.lrange(queue, 0, -1).each do |string|
      if decode(string)['class'] == klass
        destroyed += redis.lrem(queue, 0, string).to_i
      end
    end
  else
    destroyed += redis.lrem(queue, 0, encode(:class => klass, :args => args))
  end

  destroyed
end

.encode(object) ⇒ Object

Given a Ruby object, returns a string suitable for storage in a queue.



50
51
52
53
54
55
56
# File 'lib/resque/job.rb', line 50

def self.encode(object)
  if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
    MultiJson.dump object
  else
    MultiJson.encode object
  end
end

.redisObject



19
20
21
# File 'lib/resque/job.rb', line 19

def self.redis
  Resque.redis
end

.reserve(queue) ⇒ Object

Given a string queue name, returns an instance of Resque::Job if any jobs are available. If not, returns nil.



201
202
203
204
# File 'lib/resque/job.rb', line 201

def self.reserve(queue)
  return unless payload = Resque.pop(queue)
  new(queue, payload)
end

Instance Method Details

#==(other) ⇒ Object

Equality



313
314
315
316
317
# File 'lib/resque/job.rb', line 313

def ==(other)
  queue == other.queue &&
    payload_class == other.payload_class &&
    args == other.args
end

#after_hooksObject



327
328
329
# File 'lib/resque/job.rb', line 327

def after_hooks
  @after_hooks ||= Plugin.after_hooks(payload_class)
end

#argsObject

Returns an array of args represented in this job’s payload.



285
286
287
# File 'lib/resque/job.rb', line 285

def args
  @payload['args']
end

#around_hooksObject



323
324
325
# File 'lib/resque/job.rb', line 323

def around_hooks
  @around_hooks ||= Plugin.around_hooks(payload_class)
end

#before_hooksObject



319
320
321
# File 'lib/resque/job.rb', line 319

def before_hooks
  @before_hooks ||= Plugin.before_hooks(payload_class)
end

#classify(dashed_word) ⇒ Object

Given a word with dashes, returns a camel cased version of it.

classify(‘job-name’) # => ‘JobName’



76
77
78
# File 'lib/resque/job.rb', line 76

def classify(dashed_word)
  dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join
end

#constantize(camel_cased_word) ⇒ Object

Tries to find a constant with the name specified in the argument string:

constantize(“Module”) # => Module constantize(“Test::Unit”) # => Test::Unit

The name is assumed to be the one of a top-level constant, no matter whether it starts with “::” or not. No lexical context is taken into account:

C = ‘outside’ module M

C = 'inside'
C # => 'inside'
constantize("C") # => 'outside', same as ::C

end

NameError is raised when the constant is unknown.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/resque/job.rb', line 97

def constantize(camel_cased_word)
  camel_cased_word = camel_cased_word.to_s

  if camel_cased_word.include?('-')
    camel_cased_word = classify(camel_cased_word)
  end

  names = camel_cased_word.split('::')
  names.shift if names.empty? || names.first.empty?

  constant = Object
  names.each do |name|
    args = Module.method(:const_get).arity != 1 ? [false] : []

    if constant.const_defined?(name, *args)
      constant = constant.const_get(name)
    else
      constant = constant.const_missing(name)
    end
  end
  constant
end

#decode(object) ⇒ Object

Given a string, returns a Ruby object.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/resque/job.rb', line 34

def decode(object)
  return unless object

  begin
    if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
      MultiJson.load object
    else
      MultiJson.decode object
    end
  rescue ::MultiJson::DecodeError => e
    raise DecodeException, e.message, e.backtrace
  end
end

#encode(object) ⇒ Object

Given a Ruby object, returns a string suitable for storage in a queue.



25
26
27
28
29
30
31
# File 'lib/resque/job.rb', line 25

def encode(object)
  if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
    MultiJson.dump object
  else
    MultiJson.encode object
  end
end

#fail(exception) ⇒ Object

Given an exception object, hands off the needed parameters to the Failure module.



291
292
293
294
295
296
297
298
# File 'lib/resque/job.rb', line 291

def fail(exception)
  run_failure_hooks(exception)
  Failure.create \
    :payload   => payload,
    :exception => exception,
    :worker    => worker,
    :queue     => queue
end

#failure_hooksObject



331
332
333
# File 'lib/resque/job.rb', line 331

def failure_hooks
  @failure_hooks ||= Plugin.failure_hooks(payload_class)
end

#has_payload_class?Boolean

Returns:

  • (Boolean)


278
279
280
281
282
# File 'lib/resque/job.rb', line 278

def has_payload_class?
  payload_class != Object
rescue NameError
  false
end

#inspectObject

String representation



307
308
309
310
# File 'lib/resque/job.rb', line 307

def inspect
  obj = @payload
  "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ]
end

#payload_classObject

Returns the actual class constant represented in this job’s payload.



267
268
269
# File 'lib/resque/job.rb', line 267

def payload_class
  @payload_class ||= constantize(@payload['class'])
end

#payload_class_nameObject

Returns the payload class as a string without raising NameError



272
273
274
275
276
# File 'lib/resque/job.rb', line 272

def payload_class_name
  payload_class.to_s
rescue NameError
  'No Name'
end

#performObject

Attempts to perform the work represented by this job instance. Calls #perform on the class given in the payload with the arguments given in the payload.



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/resque/job.rb', line 209

def perform
  job = payload_class
  job_args = args || []
  job_was_performed = false

  begin
    # Execute before_perform hook. Abort the job gracefully if
    # Resque::DontPerform is raised.
    begin
      before_hooks.each do |hook|
        job.send(hook, *job_args)
      end
    rescue DontPerform
      return false
    end

    # Execute the job. Do it in an around_perform hook if available.
    if around_hooks.empty?
      job.perform(*job_args)
      job_was_performed = true
    else
      # We want to nest all around_perform plugins, with the last one
      # finally calling perform
      stack = around_hooks.reverse.inject(nil) do |last_hook, hook|
        if last_hook
          lambda do
            job.send(hook, *job_args) { last_hook.call }
          end
        else
          lambda do
            job.send(hook, *job_args) do
              result = job.perform(*job_args)
              job_was_performed = true
              result
            end
          end
        end
      end
      stack.call
    end

    # Execute after_perform hook
    after_hooks.each do |hook|
      job.send(hook, *job_args)
    end

    # Return true if the job was performed
    return job_was_performed

  # If an exception occurs during the job execution, look for an
  # on_failure hook then re-raise.
  rescue Object => e
    run_failure_hooks(e)
    raise e
  end
end

#recreateObject

Creates an identical job, essentially placing this job back on the queue.



302
303
304
# File 'lib/resque/job.rb', line 302

def recreate
  self.class.create(queue, payload_class, *args)
end

#redisObject



15
16
17
# File 'lib/resque/job.rb', line 15

def redis
  Resque.redis
end

#run_failure_hooks(exception) ⇒ Object



335
336
337
338
339
340
341
342
343
344
# File 'lib/resque/job.rb', line 335

def run_failure_hooks(exception)
  begin
    job_args = args || []
    if has_payload_class?
      failure_hooks.each { |hook| payload_class.send(hook, exception, *job_args) } unless @failure_hooks_ran
    end
  ensure
    @failure_hooks_ran = true
  end
end