Class: Qless::Job

Inherits:
BaseJob show all
Defined in:
lib/qless/job.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BaseJob

#klass, #queue

Constructor Details

#initialize(client, atts) ⇒ Job

Returns a new instance of Job.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/qless/job.rb', line 56

def initialize(client, atts)
  super(client, atts.fetch('jid'))
  %w{jid data priority tags state tracked
    failure history dependencies dependents}.each do |att|
    self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
  end

  @expires_at       = atts.fetch('expires')
  @klass_name       = atts.fetch('klass')
  @queue_name       = atts.fetch('queue')
  @worker_name      = atts.fetch('worker')
  @original_retries = atts.fetch('retries')
  @retries_left     = atts.fetch('remaining')

  # This is a silly side-effect of Lua doing JSON parsing
  @tags         = [] if @tags == {}
  @dependents   = [] if @dependents == {}
  @dependencies = [] if @dependencies == {}
  @state_changed = false
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



26
27
28
# File 'lib/qless/job.rb', line 26

def data
  @data
end

#dependenciesObject (readonly)

Returns the value of attribute dependencies.



24
25
26
# File 'lib/qless/job.rb', line 24

def dependencies
  @dependencies
end

#dependentsObject (readonly)

Returns the value of attribute dependents.



24
25
26
# File 'lib/qless/job.rb', line 24

def dependents
  @dependents
end

#expires_atObject (readonly)

Returns the value of attribute expires_at.



24
25
26
# File 'lib/qless/job.rb', line 24

def expires_at
  @expires_at
end

#failureObject (readonly)

Returns the value of attribute failure.



24
25
26
# File 'lib/qless/job.rb', line 24

def failure
  @failure
end

#historyObject (readonly)

Returns the value of attribute history.



24
25
26
# File 'lib/qless/job.rb', line 24

def history
  @history
end

#jidObject (readonly)

Returns the value of attribute jid.



24
25
26
# File 'lib/qless/job.rb', line 24

def jid
  @jid
end

#klass_nameObject (readonly)

Returns the value of attribute klass_name.



24
25
26
# File 'lib/qless/job.rb', line 24

def klass_name
  @klass_name
end

#original_retriesObject (readonly)

Returns the value of attribute original_retries.



25
26
27
# File 'lib/qless/job.rb', line 25

def original_retries
  @original_retries
end

#priorityObject

Returns the value of attribute priority.



26
27
28
# File 'lib/qless/job.rb', line 26

def priority
  @priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



24
25
26
# File 'lib/qless/job.rb', line 24

def queue_name
  @queue_name
end

#retries_leftObject (readonly)

Returns the value of attribute retries_left.



25
26
27
# File 'lib/qless/job.rb', line 25

def retries_left
  @retries_left
end

#stateObject (readonly)

Returns the value of attribute state.



24
25
26
# File 'lib/qless/job.rb', line 24

def state
  @state
end

#tagsObject

Returns the value of attribute tags.



26
27
28
# File 'lib/qless/job.rb', line 26

def tags
  @tags
end

#trackedObject (readonly)

Returns the value of attribute tracked.



24
25
26
# File 'lib/qless/job.rb', line 24

def tracked
  @tracked
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



24
25
26
# File 'lib/qless/job.rb', line 24

def worker_name
  @worker_name
end

Class Method Details

.build(client, klass, attributes = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/qless/job.rb', line 32

def self.build(client, klass, attributes = {})
  defaults = {
    "jid"              => Qless.generate_jid,
    "data"             => {},
    "klass"            => klass.to_s,
    "priority"         => 0,
    "tags"             => [],
    "worker"           => "mock_worker",
    "expires"          => Time.now + (60 * 60), # an hour from now
    "state"            => "running",
    "tracked"          => false,
    "queue"            => "mock_queue",
    "retries"          => 5,
    "remaining"        => 5,
    "failure"          => {},
    "history"          => [],
    "dependencies"     => [],
    "dependents"       => []
  }
  attributes = defaults.merge(Qless.stringify_hash_keys(attributes))
  attributes["data"] = JSON.load(JSON.dump attributes["data"])
  new(client, attributes)
end

Instance Method Details

#[](key) ⇒ Object



83
84
85
# File 'lib/qless/job.rb', line 83

def [](key)
  @data[key]
end

#[]=(key, val) ⇒ Object



87
88
89
# File 'lib/qless/job.rb', line 87

def []=(key, val)
  @data[key] = val
end

#cancelObject



159
160
161
162
163
# File 'lib/qless/job.rb', line 159

def cancel
  note_state_change do
    @client._cancel.call([], [@jid])
  end
end

#complete(nxt = nil, options = {}) ⇒ Object

Complete a job Options include

> next (String) the next queue

> delay (int) how long to delay it in the next queue



141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/qless/job.rb', line 141

def complete(nxt=nil, options={})
  response = note_state_change do
    if nxt.nil?
      @client._complete.call([], [
        @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data)])
    else
      @client._complete.call([], [
        @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay',
        options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))])
    end
  end
  response.nil? ? false : response
end

#depend(*jids) ⇒ Object



188
189
190
# File 'lib/qless/job.rb', line 188

def depend(*jids)
  !!@client._depends.call([], [@jid, 'on'] + jids)
end

#descriptionObject



95
96
97
# File 'lib/qless/job.rb', line 95

def description
  "#{@jid} (#{@klass_name} / #{@queue_name})"
end

#fail(group, message) ⇒ Object

Fail a job



117
118
119
120
121
122
123
124
125
126
# File 'lib/qless/job.rb', line 117

def fail(group, message)
  note_state_change do
    @client._fail.call([], [
      @jid,
      @worker_name,
      group, message,
      Time.now.to_f,
      JSON.generate(@data)]) || false
  end
end

#heartbeatObject

Heartbeat a job



129
130
131
132
133
134
135
# File 'lib/qless/job.rb', line 129

def heartbeat()
  @client._heartbeat.call([], [
    @jid,
    @worker_name,
    Time.now.to_f,
    JSON.generate(@data)]) || false
end

#inspectObject



99
100
101
# File 'lib/qless/job.rb', line 99

def inspect
  "<Qless::Job #{description}>"
end

#move(queue) ⇒ Object

Move this from it’s current queue into another



108
109
110
111
112
113
114
# File 'lib/qless/job.rb', line 108

def move(queue)
  note_state_change do
    @client._put.call([queue], [
      @jid, @klass_name, JSON.generate(@data), Time.now.to_f, 0
    ])
  end
end

#performObject



28
29
30
# File 'lib/qless/job.rb', line 28

def perform
  klass.perform(self)
end

#retry(delay = 0) ⇒ Object



181
182
183
184
185
186
# File 'lib/qless/job.rb', line 181

def retry(delay=0)
  note_state_change do
    results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
    results.nil? ? false : results
  end
end

#state_changed?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/qless/job.rb', line 155

def state_changed?
  @state_changed
end

#tag(*tags) ⇒ Object



173
174
175
# File 'lib/qless/job.rb', line 173

def tag(*tags)
  @client._tag.call([], ['add', @jid, Time.now.to_f] + tags)
end

#to_sObject



91
92
93
# File 'lib/qless/job.rb', line 91

def to_s
  inspect
end

#trackObject



165
166
167
# File 'lib/qless/job.rb', line 165

def track()
  @client._track.call([], ['track', @jid, Time.now.to_f])
end

#ttlObject



103
104
105
# File 'lib/qless/job.rb', line 103

def ttl
  @expires_at - Time.now.to_f
end

#undepend(*jids) ⇒ Object



192
193
194
# File 'lib/qless/job.rb', line 192

def undepend(*jids)
  !!@client._depends.call([], [@jid, 'off'] + jids)
end

#untag(*tags) ⇒ Object



177
178
179
# File 'lib/qless/job.rb', line 177

def untag(*tags)
  @client._tag.call([], ['remove', @jid, Time.now.to_f] + tags)
end

#untrackObject



169
170
171
# File 'lib/qless/job.rb', line 169

def untrack
  @client._track.call([], ['untrack', @jid, Time.now.to_f])
end