Class: Qless::Job
Defined Under Namespace
Modules: SupportsMiddleware
Constant Summary collapse
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependents ⇒ Object
readonly
Returns the value of attribute dependents.
-
#expires_at ⇒ Object
readonly
Returns the value of attribute expires_at.
-
#failure ⇒ Object
readonly
Returns the value of attribute failure.
-
#jid ⇒ Object
readonly
Returns the value of attribute jid.
-
#klass_name ⇒ Object
readonly
Returns the value of attribute klass_name.
-
#original_retries ⇒ Object
readonly
Returns the value of attribute original_retries.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#raw_queue_history ⇒ Object
readonly
Returns the value of attribute raw_queue_history.
-
#retries_left ⇒ Object
readonly
Returns the value of attribute retries_left.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#tags ⇒ Object
Returns the value of attribute tags.
-
#tracked ⇒ Object
readonly
Returns the value of attribute tracked.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Attributes inherited from BaseJob
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, val) ⇒ Object
- #cancel ⇒ Object
-
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include => next (String) the next queue => delay (int) how long to delay it in the next queue.
- #depend(*jids) ⇒ Object
- #description ⇒ Object
-
#fail(group, message) ⇒ Object
Fail a job.
-
#heartbeat ⇒ Object
Heartbeat a job.
- #history ⇒ Object
-
#initialize(client, atts) ⇒ Job
constructor
A new instance of Job.
- #initially_put_at ⇒ Object
- #inspect ⇒ Object
-
#move(queue) ⇒ Object
Move this from it’s current queue into another.
- #perform ⇒ Object
- #queue_history ⇒ Object
- #reconnect_to_redis ⇒ Object
- #retry(delay = 0) ⇒ Object
- #state_changed? ⇒ Boolean
- #tag(*tags) ⇒ Object
- #to_hash ⇒ Object
- #to_s ⇒ Object
- #track ⇒ Object
- #ttl ⇒ Object
- #undepend(*jids) ⇒ Object
- #untag(*tags) ⇒ Object
- #untrack ⇒ Object
Methods inherited from BaseJob
Constructor Details
#initialize(client, atts) ⇒ Job
Returns a new instance of Job.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/qless/job.rb', line 81 def initialize(client, atts) super(client, atts.fetch('jid')) %w{jid data priority tags state tracked failure dependencies dependents}.each do |att| self.instance_variable_set("@#{att}".to_sym, atts.fetch(att)) end @expires_at = atts.fetch('expires') @klass_name = atts.fetch('klass') @queue_name = atts.fetch('queue') @worker_name = atts.fetch('worker') @original_retries = atts.fetch('retries') @retries_left = atts.fetch('remaining') @raw_queue_history = atts.fetch('history') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false @before_callbacks = Hash.new { |h, k| h[k] = [] } @after_callbacks = Hash.new { |h, k| h[k] = [] } end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
27 28 29 |
# File 'lib/qless/job.rb', line 27 def data @data end |
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def dependencies @dependencies end |
#dependents ⇒ Object (readonly)
Returns the value of attribute dependents.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def dependents @dependents end |
#expires_at ⇒ Object (readonly)
Returns the value of attribute expires_at.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def expires_at @expires_at end |
#failure ⇒ Object (readonly)
Returns the value of attribute failure.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def failure @failure end |
#jid ⇒ Object (readonly)
Returns the value of attribute jid.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def jid @jid end |
#klass_name ⇒ Object (readonly)
Returns the value of attribute klass_name.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def klass_name @klass_name end |
#original_retries ⇒ Object (readonly)
Returns the value of attribute original_retries.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def original_retries @original_retries end |
#priority ⇒ Object
Returns the value of attribute priority.
27 28 29 |
# File 'lib/qless/job.rb', line 27 def priority @priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def queue_name @queue_name end |
#raw_queue_history ⇒ Object (readonly)
Returns the value of attribute raw_queue_history.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def raw_queue_history @raw_queue_history end |
#retries_left ⇒ Object (readonly)
Returns the value of attribute retries_left.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def retries_left @retries_left end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def state @state end |
#tags ⇒ Object
Returns the value of attribute tags.
27 28 29 |
# File 'lib/qless/job.rb', line 27 def @tags end |
#tracked ⇒ Object (readonly)
Returns the value of attribute tracked.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def tracked @tracked end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def worker_name @worker_name end |
Class Method Details
.build(client, klass, attributes = {}) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/qless/job.rb', line 51 def self.build(client, klass, attributes = {}) defaults = { "jid" => Qless.generate_jid, "data" => {}, "klass" => klass.to_s, "priority" => 0, "tags" => [], "worker" => "mock_worker", "expires" => Time.now + (60 * 60), # an hour from now "state" => "running", "tracked" => false, "queue" => "mock_queue", "retries" => 5, "remaining" => 5, "failure" => {}, "history" => [], "dependencies" => [], "dependents" => [] } attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) attributes["data"] = JSON.parse(JSON.dump attributes["data"]) new(client, attributes) end |
.middlewares_on(job_klass) ⇒ Object
75 76 77 78 79 |
# File 'lib/qless/job.rb', line 75 def self.middlewares_on(job_klass) job_klass.singleton_class.ancestors.select do |ancestor| ancestor.method_defined?(:around_perform) end end |
Instance Method Details
#[](key) ⇒ Object
111 112 113 |
# File 'lib/qless/job.rb', line 111 def [](key) @data[key] end |
#[]=(key, val) ⇒ Object
115 116 117 |
# File 'lib/qless/job.rb', line 115 def []=(key, val) @data[key] = val end |
#cancel ⇒ Object
248 249 250 251 252 |
# File 'lib/qless/job.rb', line 248 def cancel note_state_change :cancel do @client._cancel.call([], [@jid]) end end |
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include
> next (String) the next queue
> delay (int) how long to delay it in the next queue
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/qless/job.rb', line 219 def complete(nxt=nil, ={}) note_state_change :complete do response = if nxt.nil? @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data)]) else @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', .fetch(:delay, 0), 'depends', JSON.generate(.fetch(:depends, []))]) end if response response else description = if reloaded_instance = @client.jobs[@jid] reloaded_instance.description else self.description + " -- can't be reloaded" end raise CantCompleteError, "Failed to complete #{description}" end end end |
#depend(*jids) ⇒ Object
277 278 279 |
# File 'lib/qless/job.rb', line 277 def depend(*jids) !!@client._depends.call([], [@jid, 'on'] + jids) end |
#description ⇒ Object
123 124 125 |
# File 'lib/qless/job.rb', line 123 def description "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state})" end |
#fail(group, message) ⇒ Object
Fail a job
193 194 195 196 197 198 199 200 201 202 |
# File 'lib/qless/job.rb', line 193 def fail(group, ) note_state_change :fail do @client._fail.call([], [ @jid, @worker_name, group, , Time.now.to_f, JSON.generate(@data)]) || false end end |
#heartbeat ⇒ Object
Heartbeat a job
205 206 207 208 209 210 211 |
# File 'lib/qless/job.rb', line 205 def heartbeat() @client._heartbeat.call([], [ @jid, @worker_name, Time.now.to_f, JSON.generate(@data)]) || false end |
#history ⇒ Object
139 140 141 142 143 |
# File 'lib/qless/job.rb', line 139 def history warn "WARNING: Qless::Job#history is deprecated; use Qless::Job#raw_queue_history instead" + "; called from:\n#{caller.first}\n" raw_queue_history end |
#initially_put_at ⇒ Object
158 159 160 |
# File 'lib/qless/job.rb', line 158 def initially_put_at @initially_put_at ||= ('put', :min) end |
#inspect ⇒ Object
127 128 129 |
# File 'lib/qless/job.rb', line 127 def inspect "<Qless::Job #{description}>" end |
#move(queue) ⇒ Object
Move this from it’s current queue into another
184 185 186 187 188 189 190 |
# File 'lib/qless/job.rb', line 184 def move(queue) note_state_change :move do @client._put.call([queue], [ @jid, @klass_name, JSON.generate(@data), Time.now.to_f, 0 ]) end end |
#perform ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/qless/job.rb', line 37 def perform middlewares = Job.middlewares_on(klass) if middlewares.last == SupportsMiddleware klass.around_perform(self) elsif middlewares.any? raise MiddlewareMisconfiguredError, "The middleware chain for #{klass} " + "(#{middlewares.inspect}) is misconfigured. Qless::Job::SupportsMiddleware " + "must be extended onto your job class first if you want to use any middleware." else klass.perform(self) end end |
#queue_history ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/qless/job.rb', line 145 def queue_history @queue_history ||= @raw_queue_history.map do |history_event| history_event.each_with_object({}) do |(key, value), hash| # The only Numeric (Integer or Float) values we get in the history are timestamps hash[key] = if value.is_a?(Numeric) Time.at(value).utc else value end end end end |
#reconnect_to_redis ⇒ Object
135 136 137 |
# File 'lib/qless/job.rb', line 135 def reconnect_to_redis @client.redis.client.reconnect end |
#retry(delay = 0) ⇒ Object
270 271 272 273 274 275 |
# File 'lib/qless/job.rb', line 270 def retry(delay=0) note_state_change :retry do results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay]) results.nil? ? false : results end end |
#state_changed? ⇒ Boolean
244 245 246 |
# File 'lib/qless/job.rb', line 244 def state_changed? @state_changed end |
#tag(*tags) ⇒ Object
262 263 264 |
# File 'lib/qless/job.rb', line 262 def tag(*) @client._tag.call([], ['add', @jid, Time.now.to_f] + ) end |
#to_hash ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/qless/job.rb', line 162 def to_hash { jid: jid, expires_at: expires_at, state: state, queue_name: queue_name, history: raw_queue_history, worker_name: worker_name, failure: failure, klass_name: klass_name, tracked: tracked, dependencies: dependencies, dependents: dependents, original_retries: original_retries, retries_left: retries_left, data: data, priority: priority, tags: } end |
#to_s ⇒ Object
119 120 121 |
# File 'lib/qless/job.rb', line 119 def to_s inspect end |
#track ⇒ Object
254 255 256 |
# File 'lib/qless/job.rb', line 254 def track() @client._track.call([], ['track', @jid, Time.now.to_f]) end |
#ttl ⇒ Object
131 132 133 |
# File 'lib/qless/job.rb', line 131 def ttl @expires_at - Time.now.to_f end |
#undepend(*jids) ⇒ Object
281 282 283 |
# File 'lib/qless/job.rb', line 281 def undepend(*jids) !!@client._depends.call([], [@jid, 'off'] + jids) end |
#untag(*tags) ⇒ Object
266 267 268 |
# File 'lib/qless/job.rb', line 266 def untag(*) @client._tag.call([], ['remove', @jid, Time.now.to_f] + ) end |
#untrack ⇒ Object
258 259 260 |
# File 'lib/qless/job.rb', line 258 def untrack @client._track.call([], ['untrack', @jid, Time.now.to_f]) end |