Class: Reqless::Job

Inherits:
BaseJob show all
Defined in:
lib/reqless/job.rb

Overview

A Reqless job

Defined Under Namespace

Modules: SupportsMiddleware

Constant Summary collapse

MiddlewareMisconfiguredError =
Class.new(StandardError)
CantFailError =
Class.new(Reqless::LuaScriptError)
CantCompleteError =
Class.new(Reqless::LuaScriptError)

Instance Attribute Summary collapse

Attributes inherited from BaseJob

#client

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BaseJob

#==, #hash, #klass, #queue

Constructor Details

#initialize(client, atts) ⇒ Job

Returns a new instance of Job.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/reqless/job.rb', line 134

def initialize(client, atts)
  super(client, atts.fetch('jid'))
  %w{
    data failure dependencies dependents jid priority state tags throttles
    tracked
  }.each do |att|
    instance_variable_set(:"@#{att}", atts.fetch(att))
    # Redis doesn't handle nil values so well, sometimes instead returning false,
    # so massage spawned_by_jid to consistent be nil or a jid
    @spawned_from_jid = atts.fetch('spawned_from_jid', nil) || nil
  end

  # Parse the data string
  @data = JSON.parse(@data)

  @expires_at        = atts.fetch('expires')
  @klass_name        = atts.fetch('klass')
  @queue_name        = atts.fetch('queue')
  @worker_name       = atts.fetch('worker')
  @original_retries  = atts.fetch('retries')
  @retries_left      = atts.fetch('remaining')
  @raw_queue_history = atts.fetch('history')

  # This is a silly side-effect of Lua doing JSON serialization
  @tags         = [] if @tags == {}
  @dependents   = [] if @dependents == {}
  @dependencies = [] if @dependencies == {}
  @state_changed = false
  @before_callbacks = Hash.new { |h, k| h[k] = [] }
  @after_callbacks  = Hash.new { |h, k| h[k] = [] }
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



47
48
49
# File 'lib/reqless/job.rb', line 47

def data
  @data
end

#dependenciesObject (readonly)

Returns the value of attribute dependencies.



44
45
46
# File 'lib/reqless/job.rb', line 44

def dependencies
  @dependencies
end

#dependentsObject (readonly)

Returns the value of attribute dependents.



44
45
46
# File 'lib/reqless/job.rb', line 44

def dependents
  @dependents
end

#expires_atObject (readonly)

Returns the value of attribute expires_at.



43
44
45
# File 'lib/reqless/job.rb', line 43

def expires_at
  @expires_at
end

#failureObject (readonly)

Returns the value of attribute failure.



43
44
45
# File 'lib/reqless/job.rb', line 43

def failure
  @failure
end

#jidObject (readonly)

Returns the value of attribute jid.



43
44
45
# File 'lib/reqless/job.rb', line 43

def jid
  @jid
end

#klass_nameObject (readonly)

Returns the value of attribute klass_name.



44
45
46
# File 'lib/reqless/job.rb', line 44

def klass_name
  @klass_name
end

#original_retriesObject (readonly)

Returns the value of attribute original_retries.



45
46
47
# File 'lib/reqless/job.rb', line 45

def original_retries
  @original_retries
end

#priorityObject

Returns the value of attribute priority.



47
48
49
# File 'lib/reqless/job.rb', line 47

def priority
  @priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



43
44
45
# File 'lib/reqless/job.rb', line 43

def queue_name
  @queue_name
end

#raw_queue_historyObject (readonly)

Returns the value of attribute raw_queue_history.



45
46
47
# File 'lib/reqless/job.rb', line 45

def raw_queue_history
  @raw_queue_history
end

#retries_leftObject (readonly)

Returns the value of attribute retries_left.



45
46
47
# File 'lib/reqless/job.rb', line 45

def retries_left
  @retries_left
end

#spawned_from_jidObject (readonly)

Returns the value of attribute spawned_from_jid.



43
44
45
# File 'lib/reqless/job.rb', line 43

def spawned_from_jid
  @spawned_from_jid
end

#stateObject (readonly)

Returns the value of attribute state.



43
44
45
# File 'lib/reqless/job.rb', line 43

def state
  @state
end

#state_changedObject (readonly) Also known as: state_changed?

Returns the value of attribute state_changed.



46
47
48
# File 'lib/reqless/job.rb', line 46

def state_changed
  @state_changed
end

#tagsObject

Returns the value of attribute tags.



47
48
49
# File 'lib/reqless/job.rb', line 47

def tags
  @tags
end

#throttlesObject

Returns the value of attribute throttles.



47
48
49
# File 'lib/reqless/job.rb', line 47

def throttles
  @throttles
end

#trackedObject (readonly)

Returns the value of attribute tracked.



44
45
46
# File 'lib/reqless/job.rb', line 44

def tracked
  @tracked
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



43
44
45
# File 'lib/reqless/job.rb', line 43

def worker_name
  @worker_name
end

Class Method Details

.build(client, klass, attributes = {}) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/reqless/job.rb', line 88

def self.build(client, klass, attributes = {})
  defaults = {
    'jid'              => Reqless.generate_jid,
    'spawned_from_jid' => nil,
    'data'             => {},
    'klass'            => klass.to_s,
    'priority'         => 0,
    'tags'             => [],
    'worker'           => 'mock_worker',
    'expires'          => Time.now + (60 * 60), # an hour from now
    'state'            => 'running',
    'tracked'          => false,
    'queue'            => 'mock_queue',
    'retries'          => 5,
    'remaining'        => 5,
    'failure'          => {},
    'history'          => [],
    'dependencies'     => [],
    'dependents'       => [],
    'throttles'        => [],
  }
  attributes = defaults.merge(Reqless.stringify_hash_keys(attributes))
  attributes['data'] = JSON.dump(attributes['data'])
  new(client, attributes)
end

.build_opts_array(opts) ⇒ Object

Converts a hash of job options (as returned by job.to_hash) into the array format the reqless api expects.



116
117
118
119
120
121
122
123
124
125
# File 'lib/reqless/job.rb', line 116

def self.build_opts_array(opts)
  result = []
  result << JSON.generate(opts.fetch(:data, {}))
  result.concat([opts.fetch(:delay, 0)])
  result.concat(['priority', opts.fetch(:priority, 0)])
  result.concat(['tags', JSON.generate(opts.fetch(:tags, []))])
  result.concat(['retries', opts.fetch(:retries, 5)])
  result.concat(['depends', JSON.generate(opts.fetch(:depends, []))])
  result.concat(['throttles', JSON.generate(opts.fetch(:throttles, []))])
end

.middlewares_on(job_klass) ⇒ Object



127
128
129
130
131
132
# File 'lib/reqless/job.rb', line 127

def self.middlewares_on(job_klass)
  singleton_klass = job_klass.singleton_class
  singleton_klass.ancestors.select do |ancestor|
    ancestor != singleton_klass && ancestor.method_defined?(:around_perform)
  end
end

Instance Method Details

#[](key) ⇒ Object



170
171
172
# File 'lib/reqless/job.rb', line 170

def [](key)
  @data[key]
end

#[]=(key, val) ⇒ Object



174
175
176
# File 'lib/reqless/job.rb', line 174

def []=(key, val)
  @data[key] = val
end

#cancelObject



326
327
328
329
330
# File 'lib/reqless/job.rb', line 326

def cancel
  note_state_change :cancel do
    @client.call('job.cancel', @jid)
  end
end

#complete(nxt = nil, options = {}) ⇒ Object

Complete a job Options include

> next (String) the next queue

> delay (int) how long to delay it in the next queue



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/reqless/job.rb', line 310

def complete(nxt = nil, options = {})
  note_state_change :complete do
    if nxt.nil?
      @client.call(
        'job.complete', @jid, @worker_name, @queue_name, JSON.dump(@data))
    else
      @client.call('job.completeAndRequeue', @jid, @worker_name, @queue_name,
                   JSON.dump(@data), 'next', nxt, 'delay',
                   options.fetch(:delay, 0), 'depends',
                   JSON.dump(options.fetch(:depends, [])))
    end
  end
rescue Reqless::LuaScriptError => err
  raise CantCompleteError.new(err.message)
end

#depend(*jids) ⇒ Object



362
363
364
# File 'lib/reqless/job.rb', line 362

def depend(*jids)
  !!@client.call('job.addDependency', @jid, *jids)
end

#descriptionObject



182
183
184
# File 'lib/reqless/job.rb', line 182

def description
  "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state})"
end

#enqueue_optsHash

Extract the enqueue options from the job

Parameters:

  • options (Hash)

    a customizable set of options

Returns:

  • (Hash)

    options



258
259
260
261
262
263
264
265
266
267
# File 'lib/reqless/job.rb', line 258

def enqueue_opts
  {
    retries: original_retries,
    priority: priority,
    depends: dependents,
    tags: tags,
    throttles: throttles,
    data: data,
  }
end

#fail(group, message) ⇒ Object

Fail a job



282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/reqless/job.rb', line 282

def fail(group, message)
  note_state_change :fail do
    @client.call(
      'job.fail',
      @jid,
      @worker_name,
      group, message,
      JSON.dump(@data)) || false
  end
rescue Reqless::LuaScriptError => err
  raise CantFailError.new(err.message)
end

#heartbeatObject

Heartbeat a job



296
297
298
299
300
301
302
# File 'lib/reqless/job.rb', line 296

def heartbeat
  @expires_at = @client.call(
    'job.heartbeat',
    @jid,
    @worker_name,
    JSON.dump(@data))
end

#historyObject



198
199
200
201
202
# File 'lib/reqless/job.rb', line 198

def history
  warn 'WARNING: Reqless::Job#history is deprecated; use' +
       "Reqless::Job#raw_queue_history instead; from:\n#{caller.first}"
  raw_queue_history
end

#initially_put_atObject



218
219
220
# File 'lib/reqless/job.rb', line 218

def initially_put_at
  @initially_put_at ||= history_timestamp('put', :min)
end

#inspectObject



186
187
188
# File 'lib/reqless/job.rb', line 186

def inspect
  "<Reqless::Job #{description}>"
end

#log(message, data = nil) ⇒ Object



374
375
376
377
378
379
380
# File 'lib/reqless/job.rb', line 374

def log(message, data = nil)
  if data
    @client.call('job.log', @jid, message, JSON.dump(data))
  else
    @client.call('job.log', @jid, message)
  end
end

#note_state_change(event) ⇒ Object



394
395
396
397
398
399
400
# File 'lib/reqless/job.rb', line 394

def note_state_change(event)
  @before_callbacks[event].each { |blk| blk.call(self) }
  result = yield
  @state_changed = true
  @after_callbacks[event].each { |blk| blk.call(self) }
  result
end

#performObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/reqless/job.rb', line 59

def perform
  # If we can't find the class, we should fail the job, not try to process
  begin
    klass
  rescue NameError
    return fail("#{queue_name}-NameError", "Cannot find #{klass_name}")
  end

  # log a real process executing job -- before we start processing
  log("started by pid:#{Process.pid}")

  middlewares = Job.middlewares_on(klass)

  if middlewares.last == SupportsMiddleware
    klass.around_perform(self)
  elsif middlewares.any?
    raise MiddlewareMisconfiguredError, 'The middleware chain for ' +
          "#{klass} (#{middlewares.inspect}) is misconfigured." +
          'Reqless::Job::SupportsMiddleware must be extended onto your job' +
          'class first if you want to use any middleware.'
  elsif !klass.respond_to?(:perform)
    # If the klass doesn't have a :perform method, we should raise an error
    fail("#{queue_name}-method-missing",
         "#{klass_name} has no perform method")
  else
    klass.perform(self)
  end
end

#queue_historyObject



204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/reqless/job.rb', line 204

def queue_history
  @queue_history ||= @raw_queue_history.map do |history_event|
    history_event.each_with_object({}) do |(key, value), hash|
      # The only Numeric (Integer or Float) values we get in the history
      # are timestamps
      if value.is_a?(Numeric)
        hash[key] = Time.at(value).utc
      else
        hash[key] = value
      end
    end
  end
end

#requeue(queue, opts = {}) ⇒ Object Also known as: move

Move this from it’s current queue into another



270
271
272
273
274
275
276
# File 'lib/reqless/job.rb', line 270

def requeue(queue, opts = {})
  note_state_change :requeue do
    @client.call('job.requeue', @client.worker_name, queue, @jid, @klass_name,
                 *self.class.build_opts_array(self.enqueue_opts.merge!(opts))
    )
  end
end

#retry(delay = 0, group = nil, message = nil) ⇒ Object



348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/reqless/job.rb', line 348

def retry(delay = 0, group = nil, message = nil)
  note_state_change :retry do
    if group.nil?
      results = @client.call(
        'job.retry', @jid, @queue_name, @worker_name, delay)
      results.nil? ? false : results
    else
      results = @client.call(
        'job.retry', @jid, @queue_name, @worker_name, delay, group, message)
      results.nil? ? false : results
    end
  end
end

#spawned_fromObject



222
223
224
225
# File 'lib/reqless/job.rb', line 222

def spawned_from
  return nil if @spawned_from_jid.nil?
  @spawned_from ||= @client.jobs[@spawned_from_jid]
end

#tag(*tags) ⇒ Object



340
341
342
# File 'lib/reqless/job.rb', line 340

def tag(*tags)
  JSON.parse(@client.call('job.addTag', @jid, *tags))
end

#throttle_objectsObject



194
195
196
# File 'lib/reqless/job.rb', line 194

def throttle_objects
  throttles.map { |name| Throttle.new(name, client) }
end

#timeoutObject



370
371
372
# File 'lib/reqless/job.rb', line 370

def timeout
  @client.call('job.timeout', @jid)
end

#to_hashObject



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/reqless/job.rb', line 227

def to_hash
  {
    jid: jid,
    spawned_from_jid: spawned_from_jid,
    expires_at: expires_at,
    state: state,
    queue_name: queue_name,
    history: raw_queue_history,
    worker_name: worker_name,
    failure: failure,
    klass_name: klass_name,
    tracked: tracked,
    dependencies: dependencies,
    dependents: dependents,
    original_retries: original_retries,
    retries_left: retries_left,
    data: data,
    priority: priority,
    tags: tags,
    throttles: throttles,
  }
end

#to_sObject



178
179
180
# File 'lib/reqless/job.rb', line 178

def to_s
  inspect
end

#trackObject



332
333
334
# File 'lib/reqless/job.rb', line 332

def track
  @client.call('job.track', @jid)
end

#ttlObject



190
191
192
# File 'lib/reqless/job.rb', line 190

def ttl
  @expires_at - Time.now.to_f
end

#undepend(*jids) ⇒ Object



366
367
368
# File 'lib/reqless/job.rb', line 366

def undepend(*jids)
  !!@client.call('job.removeDependency', @jid,  *jids)
end

#untag(*tags) ⇒ Object



344
345
346
# File 'lib/reqless/job.rb', line 344

def untag(*tags)
  JSON.parse(@client.call('job.removeTag', @jid, *tags))
end

#untrackObject



336
337
338
# File 'lib/reqless/job.rb', line 336

def untrack
  @client.call('job.untrack', @jid)
end