Class: Qless::Job
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependents ⇒ Object
readonly
Returns the value of attribute dependents.
-
#expires_at ⇒ Object
readonly
Returns the value of attribute expires_at.
-
#failure ⇒ Object
readonly
Returns the value of attribute failure.
-
#history ⇒ Object
readonly
Returns the value of attribute history.
-
#jid ⇒ Object
readonly
Returns the value of attribute jid.
-
#klass_name ⇒ Object
readonly
Returns the value of attribute klass_name.
-
#original_retries ⇒ Object
readonly
Returns the value of attribute original_retries.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#retries_left ⇒ Object
readonly
Returns the value of attribute retries_left.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#tags ⇒ Object
Returns the value of attribute tags.
-
#tracked ⇒ Object
readonly
Returns the value of attribute tracked.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, val) ⇒ Object
- #cancel ⇒ Object
-
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include => next (String) the next queue => delay (int) how long to delay it in the next queue.
- #depend(*jids) ⇒ Object
- #description ⇒ Object
-
#fail(group, message) ⇒ Object
Fail a job.
-
#heartbeat ⇒ Object
Heartbeat a job.
-
#initialize(client, atts) ⇒ Job
constructor
A new instance of Job.
- #inspect ⇒ Object
-
#move(queue) ⇒ Object
Move this from it’s current queue into another.
- #perform ⇒ Object
- #retry(delay = 0) ⇒ Object
- #state_changed? ⇒ Boolean
- #tag(*tags) ⇒ Object
- #to_s ⇒ Object
- #track ⇒ Object
- #ttl ⇒ Object
- #undepend(*jids) ⇒ Object
- #untag(*tags) ⇒ Object
- #untrack ⇒ Object
Methods inherited from BaseJob
Constructor Details
#initialize(client, atts) ⇒ Job
Returns a new instance of Job.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/qless/job.rb', line 56 def initialize(client, atts) super(client, atts.fetch('jid')) %w{jid data priority tags state tracked failure history dependencies dependents}.each do |att| self.instance_variable_set("@#{att}".to_sym, atts.fetch(att)) end @expires_at = atts.fetch('expires') @klass_name = atts.fetch('klass') @queue_name = atts.fetch('queue') @worker_name = atts.fetch('worker') @original_retries = atts.fetch('retries') @retries_left = atts.fetch('remaining') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def data @data end |
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def dependencies @dependencies end |
#dependents ⇒ Object (readonly)
Returns the value of attribute dependents.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def dependents @dependents end |
#expires_at ⇒ Object (readonly)
Returns the value of attribute expires_at.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def expires_at @expires_at end |
#failure ⇒ Object (readonly)
Returns the value of attribute failure.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def failure @failure end |
#history ⇒ Object (readonly)
Returns the value of attribute history.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def history @history end |
#jid ⇒ Object (readonly)
Returns the value of attribute jid.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def jid @jid end |
#klass_name ⇒ Object (readonly)
Returns the value of attribute klass_name.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def klass_name @klass_name end |
#original_retries ⇒ Object (readonly)
Returns the value of attribute original_retries.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def original_retries @original_retries end |
#priority ⇒ Object
Returns the value of attribute priority.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def priority @priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def queue_name @queue_name end |
#retries_left ⇒ Object (readonly)
Returns the value of attribute retries_left.
25 26 27 |
# File 'lib/qless/job.rb', line 25 def retries_left @retries_left end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def state @state end |
#tags ⇒ Object
Returns the value of attribute tags.
26 27 28 |
# File 'lib/qless/job.rb', line 26 def @tags end |
#tracked ⇒ Object (readonly)
Returns the value of attribute tracked.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def tracked @tracked end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
24 25 26 |
# File 'lib/qless/job.rb', line 24 def worker_name @worker_name end |
Class Method Details
.build(client, klass, attributes = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/qless/job.rb', line 32 def self.build(client, klass, attributes = {}) defaults = { "jid" => Qless.generate_jid, "data" => {}, "klass" => klass.to_s, "priority" => 0, "tags" => [], "worker" => "mock_worker", "expires" => Time.now + (60 * 60), # an hour from now "state" => "running", "tracked" => false, "queue" => "mock_queue", "retries" => 5, "remaining" => 5, "failure" => {}, "history" => [], "dependencies" => [], "dependents" => [] } attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) attributes["data"] = JSON.load(JSON.dump attributes["data"]) new(client, attributes) end |
Instance Method Details
#[](key) ⇒ Object
83 84 85 |
# File 'lib/qless/job.rb', line 83 def [](key) @data[key] end |
#[]=(key, val) ⇒ Object
87 88 89 |
# File 'lib/qless/job.rb', line 87 def []=(key, val) @data[key] = val end |
#cancel ⇒ Object
159 160 161 162 163 |
# File 'lib/qless/job.rb', line 159 def cancel note_state_change do @client._cancel.call([], [@jid]) end end |
#complete(nxt = nil, options = {}) ⇒ Object
Complete a job Options include
> next (String) the next queue
> delay (int) how long to delay it in the next queue
141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/qless/job.rb', line 141 def complete(nxt=nil, ={}) response = note_state_change do if nxt.nil? @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data)]) else @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', .fetch(:delay, 0), 'depends', JSON.generate(.fetch(:depends, []))]) end end response.nil? ? false : response end |
#depend(*jids) ⇒ Object
188 189 190 |
# File 'lib/qless/job.rb', line 188 def depend(*jids) !!@client._depends.call([], [@jid, 'on'] + jids) end |
#description ⇒ Object
95 96 97 |
# File 'lib/qless/job.rb', line 95 def description "#{@jid} (#{@klass_name} / #{@queue_name})" end |
#fail(group, message) ⇒ Object
Fail a job
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/qless/job.rb', line 117 def fail(group, ) note_state_change do @client._fail.call([], [ @jid, @worker_name, group, , Time.now.to_f, JSON.generate(@data)]) || false end end |
#heartbeat ⇒ Object
Heartbeat a job
129 130 131 132 133 134 135 |
# File 'lib/qless/job.rb', line 129 def heartbeat() @client._heartbeat.call([], [ @jid, @worker_name, Time.now.to_f, JSON.generate(@data)]) || false end |
#inspect ⇒ Object
99 100 101 |
# File 'lib/qless/job.rb', line 99 def inspect "<Qless::Job #{description}>" end |
#move(queue) ⇒ Object
Move this from it’s current queue into another
108 109 110 111 112 113 114 |
# File 'lib/qless/job.rb', line 108 def move(queue) note_state_change do @client._put.call([queue], [ @jid, @klass_name, JSON.generate(@data), Time.now.to_f, 0 ]) end end |
#perform ⇒ Object
28 29 30 |
# File 'lib/qless/job.rb', line 28 def perform klass.perform(self) end |
#retry(delay = 0) ⇒ Object
181 182 183 184 185 186 |
# File 'lib/qless/job.rb', line 181 def retry(delay=0) note_state_change do results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay]) results.nil? ? false : results end end |
#state_changed? ⇒ Boolean
155 156 157 |
# File 'lib/qless/job.rb', line 155 def state_changed? @state_changed end |
#tag(*tags) ⇒ Object
173 174 175 |
# File 'lib/qless/job.rb', line 173 def tag(*) @client._tag.call([], ['add', @jid, Time.now.to_f] + ) end |
#to_s ⇒ Object
91 92 93 |
# File 'lib/qless/job.rb', line 91 def to_s inspect end |
#track ⇒ Object
165 166 167 |
# File 'lib/qless/job.rb', line 165 def track() @client._track.call([], ['track', @jid, Time.now.to_f]) end |
#ttl ⇒ Object
103 104 105 |
# File 'lib/qless/job.rb', line 103 def ttl @expires_at - Time.now.to_f end |
#undepend(*jids) ⇒ Object
192 193 194 |
# File 'lib/qless/job.rb', line 192 def undepend(*jids) !!@client._depends.call([], [@jid, 'off'] + jids) end |
#untag(*tags) ⇒ Object
177 178 179 |
# File 'lib/qless/job.rb', line 177 def untag(*) @client._tag.call([], ['remove', @jid, Time.now.to_f] + ) end |
#untrack ⇒ Object
169 170 171 |
# File 'lib/qless/job.rb', line 169 def untrack @client._track.call([], ['untrack', @jid, Time.now.to_f]) end |