Class: Qless::Job
Overview
A Qless job
Defined Under Namespace
Modules: SupportsMiddleware
Constant Summary collapse
- MiddlewareMisconfiguredError =
Class.new(StandardError)
- CantFailError =
Class.new(Qless::LuaScriptError)
- CantCompleteError =
Class.new(Qless::LuaScriptError)
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependents ⇒ Object
readonly
Returns the value of attribute dependents.
-
#expires_at ⇒ Object
readonly
Returns the value of attribute expires_at.
-
#failure ⇒ Object
readonly
Returns the value of attribute failure.
-
#jid ⇒ Object
readonly
Returns the value of attribute jid.
-
#klass_name ⇒ Object
readonly
Returns the value of attribute klass_name.
-
#original_retries ⇒ Object
readonly
Returns the value of attribute original_retries.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#raw_queue_history ⇒ Object
readonly
Returns the value of attribute raw_queue_history.
-
#retries_left ⇒ Object
readonly
Returns the value of attribute retries_left.
-
#spawned_from_jid ⇒ Object
readonly
Returns the value of attribute spawned_from_jid.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#state_changed ⇒ Object
(also: #state_changed?)
readonly
Returns the value of attribute state_changed.
-
#tags ⇒ Object
Returns the value of attribute tags.
-
#tracked ⇒ Object
readonly
Returns the value of attribute tracked.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Attributes inherited from BaseJob
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, val) ⇒ Object
- #cancel ⇒ Object
-
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include => next (String) the next queue => delay (int) how long to delay it in the next queue.
- #depend(*jids) ⇒ Object
- #description ⇒ Object
-
#fail(group, message) ⇒ Object
Fail a job.
-
#heartbeat ⇒ Object
Heartbeat a job.
- #history ⇒ Object
-
#initialize(client, atts) ⇒ Job
constructor
A new instance of Job.
- #initially_put_at ⇒ Object
- #inspect ⇒ Object
- #log(message, data = nil) ⇒ Object
- #note_state_change(event) ⇒ Object
- #perform ⇒ Object
- #queue_history ⇒ Object
- #reconnect_to_redis ⇒ Object
-
#requeue(queue, opts = {}) ⇒ Object
(also: #move)
Move this from it’s current queue into another.
- #retry(delay = 0, group = nil, message = nil) ⇒ Object
- #spawned_from ⇒ Object
- #tag(*tags) ⇒ Object
- #timeout ⇒ Object
- #to_hash ⇒ Object
- #to_s ⇒ Object
- #track ⇒ Object
- #ttl ⇒ Object
- #undepend(*jids) ⇒ Object
- #untag(*tags) ⇒ Object
- #untrack ⇒ Object
Methods inherited from BaseJob
Constructor Details
#initialize(client, atts) ⇒ Job
Returns a new instance of Job.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/qless/job.rb', line 119 def initialize(client, atts) super(client, atts.fetch('jid')) %w{jid data priority tags state tracked failure dependencies dependents spawned_from_jid}.each do |att| instance_variable_set(:"@#{att}", atts.fetch(att)) end # Parse the data string @data = JSON.parse(@data) @expires_at = atts.fetch('expires') @klass_name = atts.fetch('klass') @queue_name = atts.fetch('queue') @worker_name = atts.fetch('worker') @original_retries = atts.fetch('retries') @retries_left = atts.fetch('remaining') @raw_queue_history = atts.fetch('history') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false @before_callbacks = Hash.new { |h, k| h[k] = [] } @after_callbacks = Hash.new { |h, k| h[k] = [] } end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
47 48 49 |
# File 'lib/qless/job.rb', line 47 def data @data end |
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
44 45 46 |
# File 'lib/qless/job.rb', line 44 def dependencies @dependencies end |
#dependents ⇒ Object (readonly)
Returns the value of attribute dependents.
44 45 46 |
# File 'lib/qless/job.rb', line 44 def dependents @dependents end |
#expires_at ⇒ Object (readonly)
Returns the value of attribute expires_at.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def expires_at @expires_at end |
#failure ⇒ Object (readonly)
Returns the value of attribute failure.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def failure @failure end |
#jid ⇒ Object (readonly)
Returns the value of attribute jid.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def jid @jid end |
#klass_name ⇒ Object (readonly)
Returns the value of attribute klass_name.
44 45 46 |
# File 'lib/qless/job.rb', line 44 def klass_name @klass_name end |
#original_retries ⇒ Object (readonly)
Returns the value of attribute original_retries.
45 46 47 |
# File 'lib/qless/job.rb', line 45 def original_retries @original_retries end |
#priority ⇒ Object
Returns the value of attribute priority.
47 48 49 |
# File 'lib/qless/job.rb', line 47 def priority @priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def queue_name @queue_name end |
#raw_queue_history ⇒ Object (readonly)
Returns the value of attribute raw_queue_history.
45 46 47 |
# File 'lib/qless/job.rb', line 45 def raw_queue_history @raw_queue_history end |
#retries_left ⇒ Object (readonly)
Returns the value of attribute retries_left.
45 46 47 |
# File 'lib/qless/job.rb', line 45 def retries_left @retries_left end |
#spawned_from_jid ⇒ Object (readonly)
Returns the value of attribute spawned_from_jid.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def spawned_from_jid @spawned_from_jid end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def state @state end |
#state_changed ⇒ Object (readonly) Also known as: state_changed?
Returns the value of attribute state_changed.
46 47 48 |
# File 'lib/qless/job.rb', line 46 def state_changed @state_changed end |
#tags ⇒ Object
Returns the value of attribute tags.
47 48 49 |
# File 'lib/qless/job.rb', line 47 def @tags end |
#tracked ⇒ Object (readonly)
Returns the value of attribute tracked.
44 45 46 |
# File 'lib/qless/job.rb', line 44 def tracked @tracked end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
43 44 45 |
# File 'lib/qless/job.rb', line 43 def worker_name @worker_name end |
Class Method Details
.build(client, klass, attributes = {}) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/qless/job.rb', line 87 def self.build(client, klass, attributes = {}) defaults = { 'jid' => Qless.generate_jid, 'spawned_from_jid' => nil, 'data' => {}, 'klass' => klass.to_s, 'priority' => 0, 'tags' => [], 'worker' => 'mock_worker', 'expires' => Time.now + (60 * 60), # an hour from now 'state' => 'running', 'tracked' => false, 'queue' => 'mock_queue', 'retries' => 5, 'remaining' => 5, 'failure' => {}, 'history' => [], 'dependencies' => [], 'dependents' => [] } attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) attributes['data'] = JSON.dump(attributes['data']) new(client, attributes) end |
.middlewares_on(job_klass) ⇒ Object
112 113 114 115 116 117 |
# File 'lib/qless/job.rb', line 112 def self.middlewares_on(job_klass) singleton_klass = job_klass.singleton_class singleton_klass.ancestors.select do |ancestor| ancestor != singleton_klass && ancestor.method_defined?(:around_perform) end end |
Instance Method Details
#[](key) ⇒ Object
150 151 152 |
# File 'lib/qless/job.rb', line 150 def [](key) @data[key] end |
#[]=(key, val) ⇒ Object
154 155 156 |
# File 'lib/qless/job.rb', line 154 def []=(key, val) @data[key] = val end |
#cancel ⇒ Object
295 296 297 298 299 |
# File 'lib/qless/job.rb', line 295 def cancel note_state_change :cancel do @client.call('cancel', @jid) end end |
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include
> next (String) the next queue
> delay (int) how long to delay it in the next queue
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/qless/job.rb', line 279 def complete(nxt = nil, = {}) note_state_change :complete do if nxt.nil? @client.call( 'complete', @jid, @worker_name, @queue_name, JSON.dump(@data)) else @client.call('complete', @jid, @worker_name, @queue_name, JSON.dump(@data), 'next', nxt, 'delay', .fetch(:delay, 0), 'depends', JSON.dump(.fetch(:depends, []))) end end rescue Qless::LuaScriptError => err raise CantCompleteError.new(err.) end |
#depend(*jids) ⇒ Object
331 332 333 |
# File 'lib/qless/job.rb', line 331 def depend(*jids) !!@client.call('depends', @jid, 'on', *jids) end |
#description ⇒ Object
162 163 164 |
# File 'lib/qless/job.rb', line 162 def description "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state} / #{@data})" end |
#fail(group, message) ⇒ Object
Fail a job
251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/qless/job.rb', line 251 def fail(group, ) note_state_change :fail do @client.call( 'fail', @jid, @worker_name, group, , JSON.dump(@data)) || false end rescue Qless::LuaScriptError => err raise CantFailError.new(err.) end |
#heartbeat ⇒ Object
Heartbeat a job
265 266 267 268 269 270 271 |
# File 'lib/qless/job.rb', line 265 def heartbeat @expires_at = @client.call( 'heartbeat', @jid, @worker_name, JSON.dump(@data)) end |
#history ⇒ Object
178 179 180 181 182 |
# File 'lib/qless/job.rb', line 178 def history warn 'WARNING: Qless::Job#history is deprecated; use' + "Qless::Job#raw_queue_history instead; from:\n#{caller.first}" raw_queue_history end |
#initially_put_at ⇒ Object
198 199 200 |
# File 'lib/qless/job.rb', line 198 def initially_put_at @initially_put_at ||= ('put', :min) end |
#inspect ⇒ Object
166 167 168 |
# File 'lib/qless/job.rb', line 166 def inspect "<Qless::Job #{description}>" end |
#log(message, data = nil) ⇒ Object
343 344 345 346 347 348 349 |
# File 'lib/qless/job.rb', line 343 def log(, data = nil) if data @client.call('log', @jid, , JSON.dump(data)) else @client.call('log', @jid, ) end end |
#note_state_change(event) ⇒ Object
363 364 365 366 367 368 369 |
# File 'lib/qless/job.rb', line 363 def note_state_change(event) @before_callbacks[event].each { |blk| blk.call(self) } result = yield @state_changed = true @after_callbacks[event].each { |blk| blk.call(self) } result end |
#perform ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/qless/job.rb', line 58 def perform # If we can't find the class, we should fail the job, not try to process begin klass rescue NameError return fail("#{queue_name}-NameError", "Cannot find #{klass_name}") end # log a real process executing job -- before we start processing log("started by pid:#{Process.pid}") middlewares = Job.middlewares_on(klass) if middlewares.last == SupportsMiddleware klass.around_perform(self) elsif middlewares.any? raise MiddlewareMisconfiguredError, 'The middleware chain for ' + "#{klass} (#{middlewares.inspect}) is misconfigured." + 'Qless::Job::SupportsMiddleware must be extended onto your job' + 'class first if you want to use any middleware.' elsif !klass.respond_to?(:perform) # If the klass doesn't have a :perform method, we should raise an error fail("#{queue_name}-method-missing", "#{klass_name} has no perform method") else klass.perform(self) end end |
#queue_history ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/qless/job.rb', line 184 def queue_history @queue_history ||= @raw_queue_history.map do |history_event| history_event.each_with_object({}) do |(key, value), hash| # The only Numeric (Integer or Float) values we get in the history # are timestamps if value.is_a?(Numeric) hash[key] = Time.at(value).utc else hash[key] = value end end end end |
#reconnect_to_redis ⇒ Object
174 175 176 |
# File 'lib/qless/job.rb', line 174 def reconnect_to_redis @client.redis.client.reconnect end |
#requeue(queue, opts = {}) ⇒ Object Also known as: move
Move this from it’s current queue into another
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/qless/job.rb', line 229 def requeue(queue, opts = {}) queue_name = case queue when String, Symbol then queue else queue.name end note_state_change :requeue do @client.call('requeue', @client.worker_name, queue_name, @jid, @klass_name, JSON.dump(opts.fetch(:data, @data)), opts.fetch(:delay, 0), 'priority', opts.fetch(:priority, @priority), 'tags', JSON.dump(opts.fetch(:tags, @tags)), 'retries', opts.fetch(:retries, @original_retries), 'depends', JSON.dump(opts.fetch(:depends, @dependencies)) ) end end |
#retry(delay = 0, group = nil, message = nil) ⇒ Object
317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/qless/job.rb', line 317 def retry(delay = 0, group = nil, = nil) note_state_change :retry do if group.nil? results = @client.call( 'retry', @jid, @queue_name, @worker_name, delay) results.nil? ? false : results else results = @client.call( 'retry', @jid, @queue_name, @worker_name, delay, group, ) results.nil? ? false : results end end end |
#spawned_from ⇒ Object
202 203 204 |
# File 'lib/qless/job.rb', line 202 def spawned_from @spawned_from ||= @client.jobs[@spawned_from_jid] end |
#tag(*tags) ⇒ Object
309 310 311 |
# File 'lib/qless/job.rb', line 309 def tag(*) @client.call('tag', 'add', @jid, *) end |
#timeout ⇒ Object
339 340 341 |
# File 'lib/qless/job.rb', line 339 def timeout @client.call('timeout', @jid) end |
#to_hash ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/qless/job.rb', line 206 def to_hash { jid: jid, spawned_from_jid: spawned_from_jid, expires_at: expires_at, state: state, queue_name: queue_name, history: raw_queue_history, worker_name: worker_name, failure: failure, klass_name: klass_name, tracked: tracked, dependencies: dependencies, dependents: dependents, original_retries: original_retries, retries_left: retries_left, data: data, priority: priority, tags: } end |
#to_s ⇒ Object
158 159 160 |
# File 'lib/qless/job.rb', line 158 def to_s inspect end |
#track ⇒ Object
301 302 303 |
# File 'lib/qless/job.rb', line 301 def track @client.call('track', 'track', @jid) end |
#ttl ⇒ Object
170 171 172 |
# File 'lib/qless/job.rb', line 170 def ttl @expires_at - Time.now.to_f end |
#undepend(*jids) ⇒ Object
335 336 337 |
# File 'lib/qless/job.rb', line 335 def undepend(*jids) !!@client.call('depends', @jid, 'off', *jids) end |
#untag(*tags) ⇒ Object
313 314 315 |
# File 'lib/qless/job.rb', line 313 def untag(*) @client.call('tag', 'remove', @jid, *) end |
#untrack ⇒ Object
305 306 307 |
# File 'lib/qless/job.rb', line 305 def untrack @client.call('track', 'untrack', @jid) end |