Class: Reqless::Job
Overview
A Reqless job
Defined Under Namespace
Modules: SupportsMiddleware
Constant Summary collapse
- MiddlewareMisconfiguredError =
Class.new(StandardError)
- CantFailError =
Class.new(Reqless::LuaScriptError)
- CantCompleteError =
Class.new(Reqless::LuaScriptError)
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependents ⇒ Object
readonly
Returns the value of attribute dependents.
-
#expires_at ⇒ Object
readonly
Returns the value of attribute expires_at.
-
#failure ⇒ Object
readonly
Returns the value of attribute failure.
-
#jid ⇒ Object
readonly
Returns the value of attribute jid.
-
#klass_name ⇒ Object
readonly
Returns the value of attribute klass_name.
-
#original_retries ⇒ Object
readonly
Returns the value of attribute original_retries.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#raw_queue_history ⇒ Object
readonly
Returns the value of attribute raw_queue_history.
-
#retries_left ⇒ Object
readonly
Returns the value of attribute retries_left.
-
#spawned_from_jid ⇒ Object
readonly
Returns the value of attribute spawned_from_jid.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#state_changed ⇒ Object
(also: #state_changed?)
readonly
Returns the value of attribute state_changed.
-
#tags ⇒ Object
Returns the value of attribute tags.
-
#throttles ⇒ Object
Returns the value of attribute throttles.
-
#tracked ⇒ Object
readonly
Returns the value of attribute tracked.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Attributes inherited from BaseJob
Class Method Summary collapse
- .build(client, klass, attributes = {}) ⇒ Object
-
.build_opts_array(opts) ⇒ Object
Converts a hash of job options (as returned by job.to_hash) into the array format the reqless api expects.
- .middlewares_on(job_klass) ⇒ Object
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, val) ⇒ Object
- #cancel ⇒ Object
-
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include => next (String) the next queue => delay (int) how long to delay it in the next queue.
- #depend(*jids) ⇒ Object
- #description ⇒ Object
-
#enqueue_opts ⇒ Hash
Extract the enqueue options from the job.
-
#fail(group, message) ⇒ Object
Fail a job.
-
#heartbeat ⇒ Object
Heartbeat a job.
- #history ⇒ Object
-
#initialize(client, atts) ⇒ Job
constructor
A new instance of Job.
- #initially_put_at ⇒ Object
- #inspect ⇒ Object
- #log(message, data = nil) ⇒ Object
- #note_state_change(event) ⇒ Object
- #perform ⇒ Object
- #queue_history ⇒ Object
-
#requeue(queue, opts = {}) ⇒ Object
(also: #move)
Move this from it’s current queue into another.
- #retry(delay = 0, group = nil, message = nil) ⇒ Object
- #spawned_from ⇒ Object
- #tag(*tags) ⇒ Object
- #throttle_objects ⇒ Object
- #timeout ⇒ Object
- #to_hash ⇒ Object
- #to_s ⇒ Object
- #track ⇒ Object
- #ttl ⇒ Object
- #undepend(*jids) ⇒ Object
- #untag(*tags) ⇒ Object
- #untrack ⇒ Object
Methods inherited from BaseJob
Constructor Details
#initialize(client, atts) ⇒ Job
Returns a new instance of Job.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/reqless/job.rb', line 134 def initialize(client, atts) super(client, atts.fetch('jid')) %w{ data failure dependencies dependents jid priority state tags throttles tracked }.each do |att| instance_variable_set(:"@#{att}", atts.fetch(att)) # Redis doesn't handle nil values so well, sometimes instead returning false, # so massage spawned_by_jid to consistent be nil or a jid @spawned_from_jid = atts.fetch('spawned_from_jid', nil) || nil end # Parse the data string @data = JSON.parse(@data) @expires_at = atts.fetch('expires') @klass_name = atts.fetch('klass') @queue_name = atts.fetch('queue') @worker_name = atts.fetch('worker') @original_retries = atts.fetch('retries') @retries_left = atts.fetch('remaining') @raw_queue_history = atts.fetch('history') # This is a silly side-effect of Lua doing JSON serialization @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false @before_callbacks = Hash.new { |h, k| h[k] = [] } @after_callbacks = Hash.new { |h, k| h[k] = [] } end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
47 48 49 |
# File 'lib/reqless/job.rb', line 47 def data @data end |
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
44 45 46 |
# File 'lib/reqless/job.rb', line 44 def dependencies @dependencies end |
#dependents ⇒ Object (readonly)
Returns the value of attribute dependents.
44 45 46 |
# File 'lib/reqless/job.rb', line 44 def dependents @dependents end |
#expires_at ⇒ Object (readonly)
Returns the value of attribute expires_at.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def expires_at @expires_at end |
#failure ⇒ Object (readonly)
Returns the value of attribute failure.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def failure @failure end |
#jid ⇒ Object (readonly)
Returns the value of attribute jid.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def jid @jid end |
#klass_name ⇒ Object (readonly)
Returns the value of attribute klass_name.
44 45 46 |
# File 'lib/reqless/job.rb', line 44 def klass_name @klass_name end |
#original_retries ⇒ Object (readonly)
Returns the value of attribute original_retries.
45 46 47 |
# File 'lib/reqless/job.rb', line 45 def original_retries @original_retries end |
#priority ⇒ Object
Returns the value of attribute priority.
47 48 49 |
# File 'lib/reqless/job.rb', line 47 def priority @priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def queue_name @queue_name end |
#raw_queue_history ⇒ Object (readonly)
Returns the value of attribute raw_queue_history.
45 46 47 |
# File 'lib/reqless/job.rb', line 45 def raw_queue_history @raw_queue_history end |
#retries_left ⇒ Object (readonly)
Returns the value of attribute retries_left.
45 46 47 |
# File 'lib/reqless/job.rb', line 45 def retries_left @retries_left end |
#spawned_from_jid ⇒ Object (readonly)
Returns the value of attribute spawned_from_jid.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def spawned_from_jid @spawned_from_jid end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def state @state end |
#state_changed ⇒ Object (readonly) Also known as: state_changed?
Returns the value of attribute state_changed.
46 47 48 |
# File 'lib/reqless/job.rb', line 46 def state_changed @state_changed end |
#tags ⇒ Object
Returns the value of attribute tags.
47 48 49 |
# File 'lib/reqless/job.rb', line 47 def @tags end |
#throttles ⇒ Object
Returns the value of attribute throttles.
47 48 49 |
# File 'lib/reqless/job.rb', line 47 def throttles @throttles end |
#tracked ⇒ Object (readonly)
Returns the value of attribute tracked.
44 45 46 |
# File 'lib/reqless/job.rb', line 44 def tracked @tracked end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
43 44 45 |
# File 'lib/reqless/job.rb', line 43 def worker_name @worker_name end |
Class Method Details
.build(client, klass, attributes = {}) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/reqless/job.rb', line 88 def self.build(client, klass, attributes = {}) defaults = { 'jid' => Reqless.generate_jid, 'spawned_from_jid' => nil, 'data' => {}, 'klass' => klass.to_s, 'priority' => 0, 'tags' => [], 'worker' => 'mock_worker', 'expires' => Time.now + (60 * 60), # an hour from now 'state' => 'running', 'tracked' => false, 'queue' => 'mock_queue', 'retries' => 5, 'remaining' => 5, 'failure' => {}, 'history' => [], 'dependencies' => [], 'dependents' => [], 'throttles' => [], } attributes = defaults.merge(Reqless.stringify_hash_keys(attributes)) attributes['data'] = JSON.dump(attributes['data']) new(client, attributes) end |
.build_opts_array(opts) ⇒ Object
Converts a hash of job options (as returned by job.to_hash) into the array format the reqless api expects.
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/reqless/job.rb', line 116 def self.build_opts_array(opts) result = [] result << JSON.generate(opts.fetch(:data, {})) result.concat([opts.fetch(:delay, 0)]) result.concat(['priority', opts.fetch(:priority, 0)]) result.concat(['tags', JSON.generate(opts.fetch(:tags, []))]) result.concat(['retries', opts.fetch(:retries, 5)]) result.concat(['depends', JSON.generate(opts.fetch(:depends, []))]) result.concat(['throttles', JSON.generate(opts.fetch(:throttles, []))]) end |
.middlewares_on(job_klass) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/reqless/job.rb', line 127 def self.middlewares_on(job_klass) singleton_klass = job_klass.singleton_class singleton_klass.ancestors.select do |ancestor| ancestor != singleton_klass && ancestor.method_defined?(:around_perform) end end |
Instance Method Details
#[](key) ⇒ Object
170 171 172 |
# File 'lib/reqless/job.rb', line 170 def [](key) @data[key] end |
#[]=(key, val) ⇒ Object
174 175 176 |
# File 'lib/reqless/job.rb', line 174 def []=(key, val) @data[key] = val end |
#cancel ⇒ Object
326 327 328 329 330 |
# File 'lib/reqless/job.rb', line 326 def cancel note_state_change :cancel do @client.call('job.cancel', @jid) end end |
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include
> next (String) the next queue
> delay (int) how long to delay it in the next queue
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/reqless/job.rb', line 310 def complete(nxt = nil, = {}) note_state_change :complete do if nxt.nil? @client.call( 'job.complete', @jid, @worker_name, @queue_name, JSON.dump(@data)) else @client.call('job.completeAndRequeue', @jid, @worker_name, @queue_name, JSON.dump(@data), 'next', nxt, 'delay', .fetch(:delay, 0), 'depends', JSON.dump(.fetch(:depends, []))) end end rescue Reqless::LuaScriptError => err raise CantCompleteError.new(err.) end |
#depend(*jids) ⇒ Object
362 363 364 |
# File 'lib/reqless/job.rb', line 362 def depend(*jids) !!@client.call('job.addDependency', @jid, *jids) end |
#description ⇒ Object
182 183 184 |
# File 'lib/reqless/job.rb', line 182 def description "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state})" end |
#enqueue_opts ⇒ Hash
Extract the enqueue options from the job
258 259 260 261 262 263 264 265 266 267 |
# File 'lib/reqless/job.rb', line 258 def enqueue_opts { retries: original_retries, priority: priority, depends: dependents, tags: , throttles: throttles, data: data, } end |
#fail(group, message) ⇒ Object
Fail a job
282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/reqless/job.rb', line 282 def fail(group, ) note_state_change :fail do @client.call( 'job.fail', @jid, @worker_name, group, , JSON.dump(@data)) || false end rescue Reqless::LuaScriptError => err raise CantFailError.new(err.) end |
#heartbeat ⇒ Object
Heartbeat a job
296 297 298 299 300 301 302 |
# File 'lib/reqless/job.rb', line 296 def heartbeat @expires_at = @client.call( 'job.heartbeat', @jid, @worker_name, JSON.dump(@data)) end |
#history ⇒ Object
198 199 200 201 202 |
# File 'lib/reqless/job.rb', line 198 def history warn 'WARNING: Reqless::Job#history is deprecated; use' + "Reqless::Job#raw_queue_history instead; from:\n#{caller.first}" raw_queue_history end |
#initially_put_at ⇒ Object
218 219 220 |
# File 'lib/reqless/job.rb', line 218 def initially_put_at @initially_put_at ||= ('put', :min) end |
#inspect ⇒ Object
186 187 188 |
# File 'lib/reqless/job.rb', line 186 def inspect "<Reqless::Job #{description}>" end |
#log(message, data = nil) ⇒ Object
374 375 376 377 378 379 380 |
# File 'lib/reqless/job.rb', line 374 def log(, data = nil) if data @client.call('job.log', @jid, , JSON.dump(data)) else @client.call('job.log', @jid, ) end end |
#note_state_change(event) ⇒ Object
394 395 396 397 398 399 400 |
# File 'lib/reqless/job.rb', line 394 def note_state_change(event) @before_callbacks[event].each { |blk| blk.call(self) } result = yield @state_changed = true @after_callbacks[event].each { |blk| blk.call(self) } result end |
#perform ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/reqless/job.rb', line 59 def perform # If we can't find the class, we should fail the job, not try to process begin klass rescue NameError return fail("#{queue_name}-NameError", "Cannot find #{klass_name}") end # log a real process executing job -- before we start processing log("started by pid:#{Process.pid}") middlewares = Job.middlewares_on(klass) if middlewares.last == SupportsMiddleware klass.around_perform(self) elsif middlewares.any? raise MiddlewareMisconfiguredError, 'The middleware chain for ' + "#{klass} (#{middlewares.inspect}) is misconfigured." + 'Reqless::Job::SupportsMiddleware must be extended onto your job' + 'class first if you want to use any middleware.' elsif !klass.respond_to?(:perform) # If the klass doesn't have a :perform method, we should raise an error fail("#{queue_name}-method-missing", "#{klass_name} has no perform method") else klass.perform(self) end end |
#queue_history ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/reqless/job.rb', line 204 def queue_history @queue_history ||= @raw_queue_history.map do |history_event| history_event.each_with_object({}) do |(key, value), hash| # The only Numeric (Integer or Float) values we get in the history # are timestamps if value.is_a?(Numeric) hash[key] = Time.at(value).utc else hash[key] = value end end end end |
#requeue(queue, opts = {}) ⇒ Object Also known as: move
Move this from it’s current queue into another
270 271 272 273 274 275 276 |
# File 'lib/reqless/job.rb', line 270 def requeue(queue, opts = {}) note_state_change :requeue do @client.call('job.requeue', @client.worker_name, queue, @jid, @klass_name, *self.class.build_opts_array(self.enqueue_opts.merge!(opts)) ) end end |
#retry(delay = 0, group = nil, message = nil) ⇒ Object
348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/reqless/job.rb', line 348 def retry(delay = 0, group = nil, = nil) note_state_change :retry do if group.nil? results = @client.call( 'job.retry', @jid, @queue_name, @worker_name, delay) results.nil? ? false : results else results = @client.call( 'job.retry', @jid, @queue_name, @worker_name, delay, group, ) results.nil? ? false : results end end end |
#spawned_from ⇒ Object
222 223 224 225 |
# File 'lib/reqless/job.rb', line 222 def spawned_from return nil if @spawned_from_jid.nil? @spawned_from ||= @client.jobs[@spawned_from_jid] end |
#tag(*tags) ⇒ Object
340 341 342 |
# File 'lib/reqless/job.rb', line 340 def tag(*) JSON.parse(@client.call('job.addTag', @jid, *)) end |
#throttle_objects ⇒ Object
194 195 196 |
# File 'lib/reqless/job.rb', line 194 def throttle_objects throttles.map { |name| Throttle.new(name, client) } end |
#timeout ⇒ Object
370 371 372 |
# File 'lib/reqless/job.rb', line 370 def timeout @client.call('job.timeout', @jid) end |
#to_hash ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/reqless/job.rb', line 227 def to_hash { jid: jid, spawned_from_jid: spawned_from_jid, expires_at: expires_at, state: state, queue_name: queue_name, history: raw_queue_history, worker_name: worker_name, failure: failure, klass_name: klass_name, tracked: tracked, dependencies: dependencies, dependents: dependents, original_retries: original_retries, retries_left: retries_left, data: data, priority: priority, tags: , throttles: throttles, } end |
#to_s ⇒ Object
178 179 180 |
# File 'lib/reqless/job.rb', line 178 def to_s inspect end |
#track ⇒ Object
332 333 334 |
# File 'lib/reqless/job.rb', line 332 def track @client.call('job.track', @jid) end |
#ttl ⇒ Object
190 191 192 |
# File 'lib/reqless/job.rb', line 190 def ttl @expires_at - Time.now.to_f end |
#undepend(*jids) ⇒ Object
366 367 368 |
# File 'lib/reqless/job.rb', line 366 def undepend(*jids) !!@client.call('job.removeDependency', @jid, *jids) end |
#untag(*tags) ⇒ Object
344 345 346 |
# File 'lib/reqless/job.rb', line 344 def untag(*) JSON.parse(@client.call('job.removeTag', @jid, *)) end |
#untrack ⇒ Object
336 337 338 |
# File 'lib/reqless/job.rb', line 336 def untrack @client.call('job.untrack', @jid) end |