Class: Qless::Job

Inherits:
BaseJob show all
Defined in:
lib/qless/job.rb

Overview

A Qless job

Defined Under Namespace

Modules: SupportsMiddleware

Constant Summary collapse

MiddlewareMisconfiguredError =
Class.new(StandardError)
CantFailError =
Class.new(Qless::LuaScriptError)
CantCompleteError =
Class.new(Qless::LuaScriptError)

Instance Attribute Summary collapse

Attributes inherited from BaseJob

#client

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BaseJob

#==, #hash, #klass, #queue

Constructor Details

#initialize(client, atts) ⇒ Job

Returns a new instance of Job.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/qless/job.rb', line 119

def initialize(client, atts)
  super(client, atts.fetch('jid'))
  %w{jid data priority tags state tracked
     failure dependencies dependents spawned_from_jid}.each do |att|
    instance_variable_set(:"@#{att}", atts.fetch(att))
  end

  # Parse the data string
  @data = JSON.parse(@data)

  @expires_at        = atts.fetch('expires')
  @klass_name        = atts.fetch('klass')
  @queue_name        = atts.fetch('queue')
  @worker_name       = atts.fetch('worker')
  @original_retries  = atts.fetch('retries')
  @retries_left      = atts.fetch('remaining')
  @raw_queue_history = atts.fetch('history')

  # This is a silly side-effect of Lua doing JSON parsing
  @tags         = [] if @tags == {}
  @dependents   = [] if @dependents == {}
  @dependencies = [] if @dependencies == {}
  @state_changed = false
  @before_callbacks = Hash.new { |h, k| h[k] = [] }
  @after_callbacks  = Hash.new { |h, k| h[k] = [] }
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



47
48
49
# File 'lib/qless/job.rb', line 47

def data
  @data
end

#dependenciesObject (readonly)

Returns the value of attribute dependencies.



44
45
46
# File 'lib/qless/job.rb', line 44

def dependencies
  @dependencies
end

#dependentsObject (readonly)

Returns the value of attribute dependents.



44
45
46
# File 'lib/qless/job.rb', line 44

def dependents
  @dependents
end

#expires_atObject (readonly)

Returns the value of attribute expires_at.



43
44
45
# File 'lib/qless/job.rb', line 43

def expires_at
  @expires_at
end

#failureObject (readonly)

Returns the value of attribute failure.



43
44
45
# File 'lib/qless/job.rb', line 43

def failure
  @failure
end

#jidObject (readonly)

Returns the value of attribute jid.



43
44
45
# File 'lib/qless/job.rb', line 43

def jid
  @jid
end

#klass_nameObject (readonly)

Returns the value of attribute klass_name.



44
45
46
# File 'lib/qless/job.rb', line 44

def klass_name
  @klass_name
end

#original_retriesObject (readonly)

Returns the value of attribute original_retries.



45
46
47
# File 'lib/qless/job.rb', line 45

def original_retries
  @original_retries
end

#priorityObject

Returns the value of attribute priority.



47
48
49
# File 'lib/qless/job.rb', line 47

def priority
  @priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



43
44
45
# File 'lib/qless/job.rb', line 43

def queue_name
  @queue_name
end

#raw_queue_historyObject (readonly)

Returns the value of attribute raw_queue_history.



45
46
47
# File 'lib/qless/job.rb', line 45

def raw_queue_history
  @raw_queue_history
end

#retries_leftObject (readonly)

Returns the value of attribute retries_left.



45
46
47
# File 'lib/qless/job.rb', line 45

def retries_left
  @retries_left
end

#spawned_from_jidObject (readonly)

Returns the value of attribute spawned_from_jid.



43
44
45
# File 'lib/qless/job.rb', line 43

def spawned_from_jid
  @spawned_from_jid
end

#stateObject (readonly)

Returns the value of attribute state.



43
44
45
# File 'lib/qless/job.rb', line 43

def state
  @state
end

#state_changedObject (readonly) Also known as: state_changed?

Returns the value of attribute state_changed.



46
47
48
# File 'lib/qless/job.rb', line 46

def state_changed
  @state_changed
end

#tagsObject

Returns the value of attribute tags.



47
48
49
# File 'lib/qless/job.rb', line 47

def tags
  @tags
end

#trackedObject (readonly)

Returns the value of attribute tracked.



44
45
46
# File 'lib/qless/job.rb', line 44

def tracked
  @tracked
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



43
44
45
# File 'lib/qless/job.rb', line 43

def worker_name
  @worker_name
end

Class Method Details

.build(client, klass, attributes = {}) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/qless/job.rb', line 87

def self.build(client, klass, attributes = {})
  defaults = {
    'jid'              => Qless.generate_jid,
    'spawned_from_jid' => nil,
    'data'             => {},
    'klass'            => klass.to_s,
    'priority'         => 0,
    'tags'             => [],
    'worker'           => 'mock_worker',
    'expires'          => Time.now + (60 * 60), # an hour from now
    'state'            => 'running',
    'tracked'          => false,
    'queue'            => 'mock_queue',
    'retries'          => 5,
    'remaining'        => 5,
    'failure'          => {},
    'history'          => [],
    'dependencies'     => [],
    'dependents'       => []
  }
  attributes = defaults.merge(Qless.stringify_hash_keys(attributes))
  attributes['data'] = JSON.dump(attributes['data'])
  new(client, attributes)
end

.middlewares_on(job_klass) ⇒ Object



112
113
114
115
116
117
# File 'lib/qless/job.rb', line 112

def self.middlewares_on(job_klass)
  singleton_klass = job_klass.singleton_class
  singleton_klass.ancestors.select do |ancestor|
    ancestor != singleton_klass && ancestor.method_defined?(:around_perform)
  end
end

Instance Method Details

#[](key) ⇒ Object



150
151
152
# File 'lib/qless/job.rb', line 150

def [](key)
  @data[key]
end

#[]=(key, val) ⇒ Object



154
155
156
# File 'lib/qless/job.rb', line 154

def []=(key, val)
  @data[key] = val
end

#cancelObject



295
296
297
298
299
# File 'lib/qless/job.rb', line 295

def cancel
  note_state_change :cancel do
    @client.call('cancel', @jid)
  end
end

#complete(nxt = nil, options = {}) ⇒ Object

Complete a job Options include

> next (String) the next queue

> delay (int) how long to delay it in the next queue



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/qless/job.rb', line 279

def complete(nxt = nil, options = {})
  note_state_change :complete do
    if nxt.nil?
      @client.call(
        'complete', @jid, @worker_name, @queue_name, JSON.dump(@data))
    else
      @client.call('complete', @jid, @worker_name, @queue_name,
                   JSON.dump(@data), 'next', nxt, 'delay',
                   options.fetch(:delay, 0), 'depends',
                   JSON.dump(options.fetch(:depends, [])))
    end
  end
rescue Qless::LuaScriptError => err
  raise CantCompleteError.new(err.message)
end

#depend(*jids) ⇒ Object



331
332
333
# File 'lib/qless/job.rb', line 331

def depend(*jids)
  !!@client.call('depends', @jid, 'on', *jids)
end

#descriptionObject



162
163
164
# File 'lib/qless/job.rb', line 162

def description
  "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state} / #{@data})"
end

#fail(group, message) ⇒ Object

Fail a job



251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/qless/job.rb', line 251

def fail(group, message)
  note_state_change :fail do
    @client.call(
      'fail',
      @jid,
      @worker_name,
      group, message,
      JSON.dump(@data)) || false
  end
rescue Qless::LuaScriptError => err
  raise CantFailError.new(err.message)
end

#heartbeatObject

Heartbeat a job



265
266
267
268
269
270
271
# File 'lib/qless/job.rb', line 265

def heartbeat
  @expires_at = @client.call(
    'heartbeat',
    @jid,
    @worker_name,
    JSON.dump(@data))
end

#historyObject



178
179
180
181
182
# File 'lib/qless/job.rb', line 178

def history
  warn 'WARNING: Qless::Job#history is deprecated; use' +
       "Qless::Job#raw_queue_history instead; from:\n#{caller.first}"
  raw_queue_history
end

#initially_put_atObject



198
199
200
# File 'lib/qless/job.rb', line 198

def initially_put_at
  @initially_put_at ||= history_timestamp('put', :min)
end

#inspectObject



166
167
168
# File 'lib/qless/job.rb', line 166

def inspect
  "<Qless::Job #{description}>"
end

#log(message, data = nil) ⇒ Object



343
344
345
346
347
348
349
# File 'lib/qless/job.rb', line 343

def log(message, data = nil)
  if data
    @client.call('log', @jid, message, JSON.dump(data))
  else
    @client.call('log', @jid, message)
  end
end

#note_state_change(event) ⇒ Object



363
364
365
366
367
368
369
# File 'lib/qless/job.rb', line 363

def note_state_change(event)
  @before_callbacks[event].each { |blk| blk.call(self) }
  result = yield
  @state_changed = true
  @after_callbacks[event].each { |blk| blk.call(self) }
  result
end

#performObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/qless/job.rb', line 58

def perform
  # If we can't find the class, we should fail the job, not try to process
  begin
    klass
  rescue NameError
    return fail("#{queue_name}-NameError", "Cannot find #{klass_name}")
  end

  # log a real process executing job -- before we start processing
  log("started by pid:#{Process.pid}")

  middlewares = Job.middlewares_on(klass)

  if middlewares.last == SupportsMiddleware
    klass.around_perform(self)
  elsif middlewares.any?
    raise MiddlewareMisconfiguredError, 'The middleware chain for ' +
          "#{klass} (#{middlewares.inspect}) is misconfigured." +
          'Qless::Job::SupportsMiddleware must be extended onto your job' +
          'class first if you want to use any middleware.'
  elsif !klass.respond_to?(:perform)
    # If the klass doesn't have a :perform method, we should raise an error
    fail("#{queue_name}-method-missing",
         "#{klass_name} has no perform method")
  else
    klass.perform(self)
  end
end

#queue_historyObject



184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/qless/job.rb', line 184

def queue_history
  @queue_history ||= @raw_queue_history.map do |history_event|
    history_event.each_with_object({}) do |(key, value), hash|
      # The only Numeric (Integer or Float) values we get in the history
      # are timestamps
      if value.is_a?(Numeric)
        hash[key] = Time.at(value).utc
      else
        hash[key] = value
      end
    end
  end
end

#reconnect_to_redisObject



174
175
176
# File 'lib/qless/job.rb', line 174

def reconnect_to_redis
  @client.redis.client.reconnect
end

#requeue(queue, opts = {}) ⇒ Object Also known as: move

Move this from it’s current queue into another



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/qless/job.rb', line 229

def requeue(queue, opts = {})
  queue_name = case queue
                 when String, Symbol then queue
                 else queue.name
               end

  note_state_change :requeue do
    @client.call('requeue', @client.worker_name, queue_name, @jid, @klass_name,
                 JSON.dump(opts.fetch(:data, @data)),
                 opts.fetch(:delay, 0),
                 'priority', opts.fetch(:priority, @priority),
                 'tags', JSON.dump(opts.fetch(:tags, @tags)),
                 'retries', opts.fetch(:retries, @original_retries),
                 'depends', JSON.dump(opts.fetch(:depends, @dependencies))
    )
  end
end

#retry(delay = 0, group = nil, message = nil) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/qless/job.rb', line 317

def retry(delay = 0, group = nil, message = nil)
  note_state_change :retry do
    if group.nil?
      results = @client.call(
        'retry', @jid, @queue_name, @worker_name, delay)
      results.nil? ? false : results
    else
      results = @client.call(
        'retry', @jid, @queue_name, @worker_name, delay, group, message)
      results.nil? ? false : results
    end
  end
end

#spawned_fromObject



202
203
204
# File 'lib/qless/job.rb', line 202

def spawned_from
  @spawned_from ||= @client.jobs[@spawned_from_jid]
end

#tag(*tags) ⇒ Object



309
310
311
# File 'lib/qless/job.rb', line 309

def tag(*tags)
  @client.call('tag', 'add', @jid, *tags)
end

#timeoutObject



339
340
341
# File 'lib/qless/job.rb', line 339

def timeout
  @client.call('timeout', @jid)
end

#to_hashObject



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/qless/job.rb', line 206

def to_hash
  {
    jid: jid,
    spawned_from_jid: spawned_from_jid,
    expires_at: expires_at,
    state: state,
    queue_name: queue_name,
    history: raw_queue_history,
    worker_name: worker_name,
    failure: failure,
    klass_name: klass_name,
    tracked: tracked,
    dependencies: dependencies,
    dependents: dependents,
    original_retries: original_retries,
    retries_left: retries_left,
    data: data,
    priority: priority,
    tags: tags
  }
end

#to_sObject



158
159
160
# File 'lib/qless/job.rb', line 158

def to_s
  inspect
end

#trackObject



301
302
303
# File 'lib/qless/job.rb', line 301

def track
  @client.call('track', 'track', @jid)
end

#ttlObject



170
171
172
# File 'lib/qless/job.rb', line 170

def ttl
  @expires_at - Time.now.to_f
end

#undepend(*jids) ⇒ Object



335
336
337
# File 'lib/qless/job.rb', line 335

def undepend(*jids)
  !!@client.call('depends', @jid, 'off', *jids)
end

#untag(*tags) ⇒ Object



313
314
315
# File 'lib/qless/job.rb', line 313

def untag(*tags)
  @client.call('tag', 'remove', @jid, *tags)
end

#untrackObject



305
306
307
# File 'lib/qless/job.rb', line 305

def untrack
  @client.call('track', 'untrack', @jid)
end