Class: Burst::Job

Inherits:
Object
  • Object
show all
Includes:
Model
Defined in:
lib/burst/job.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

SUSPEND =
'suspend'.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow, hash_store = {}) ⇒ Job

Returns a new instance of Job.



12
13
14
15
# File 'lib/burst/job.rb', line 12

def initialize(workflow, hash_store = {})
  @workflow = workflow
  assign_default_values(hash_store)
end

Class Method Details

.from_hash(workflow, hash_store) ⇒ Object



39
40
41
# File 'lib/burst/job.rb', line 39

def self.from_hash(workflow, hash_store)
  hash_store[:klass].constantize.new(workflow, hash_store)
end

Instance Method Details

#assign_default_values(hash_store) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/burst/job.rb', line 17

def assign_default_values(hash_store)
  set_model(hash_store.deep_dup)

  self.id ||= SecureRandom.uuid
  self.workflow_id ||= @workflow.id
  self.klass ||= self.class.to_s
  self.incoming ||= []
  self.outgoing ||= []
end

#attributesObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/burst/job.rb', line 77

def attributes
  {
    workflow_id: self.workflow_id,
    id: self.id,
    klass: self.klass,
    params: params,
    incoming: self.incoming,
    outgoing: self.outgoing,
    output: output,
    started_at: started_at,
    enqueued_at: enqueued_at,
    finished_at: finished_at,
    failed_at: failed_at,
    suspended_at: suspended_at,
    resumed_at: resumed_at
  }
end

#configureObject



61
62
63
64
65
66
67
68
69
# File 'lib/burst/job.rb', line 61

def configure
  @workflow.with_lock do
    yield
    @workflow.resolve_dependencies
    @workflow.save!
    @workflow.all_jobs.to_a.each(&:save!)
    reload
  end
end

#current_timestampObject



176
177
178
# File 'lib/burst/job.rb', line 176

def current_timestamp
  Time.now.to_i
end

#enqueue!Object

mark job as enqueued when it is scheduled to queue

Raises:



96
97
98
99
100
101
102
103
104
# File 'lib/burst/job.rb', line 96

def enqueue!
  raise Error.new('Already enqueued') if enqueued?
  self.enqueued_at = current_timestamp
  self.started_at = nil
  self.finished_at = nil
  self.failed_at = nil
  self.suspended_at = nil
  self.resumed_at = nil
end

#enqueued?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/burst/job.rb', line 136

def enqueued?
  !enqueued_at.nil?
end

#fail!Object

mark job as failed when it is failed

Raises:



119
120
121
122
# File 'lib/burst/job.rb', line 119

def fail!
  raise Error.new('Already failed') if failed?
  self.finished_at = self.failed_at = current_timestamp
end

#failed?Boolean

Returns:

  • (Boolean)


152
153
154
# File 'lib/burst/job.rb', line 152

def failed?
  !failed_at.nil?
end

#finish!Object

mark job as finished when it is finish performing

Raises:



113
114
115
116
# File 'lib/burst/job.rb', line 113

def finish!
  raise Error.new('Already finished') if finished?
  self.finished_at = current_timestamp
end

#finished?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/burst/job.rb', line 144

def finished?
  !finished_at.nil?
end

#initial?Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/burst/job.rb', line 172

def initial?
  incoming.empty?
end

#parents_succeeded?Boolean

Returns:

  • (Boolean)


180
181
182
183
184
# File 'lib/burst/job.rb', line 180

def parents_succeeded?
  incoming.all? do |id|
    @workflow.get_job(id).succeeded?
  end
end

#performObject

execute this code by ActiveJob. You may return Burst::Job::SUSPEND to suspend job, or call suspend method



44
# File 'lib/burst/job.rb', line 44

def perform; end

#ready_to_start?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/burst/job.rb', line 168

def ready_to_start?
  !running? && !enqueued? && !finished? && !failed? && parents_succeeded?
end

#reloadObject



27
28
29
# File 'lib/burst/job.rb', line 27

def reload
  assign_default_values(@workflow.get_job_hash(self.id))
end

#resume(data) ⇒ Object

execute this code when resumes after suspending



47
48
49
# File 'lib/burst/job.rb', line 47

def resume(data)
  set_output(data)
end

#resume!Object

mark job as resumed

Raises:



130
131
132
133
134
# File 'lib/burst/job.rb', line 130

def resume!
  raise Error.new('Not suspended ') unless suspended?
  raise Error.new('Already resumed ') if resumed?
  self.resumed_at = current_timestamp
end

#resumed?Boolean

Returns:

  • (Boolean)


160
161
162
# File 'lib/burst/job.rb', line 160

def resumed?
  !resumed_at.nil?
end

#run(klass, opts = {}) ⇒ Object



71
72
73
74
75
# File 'lib/burst/job.rb', line 71

def run(klass, opts = {})
  opts[:after] = [*opts[:after], self.id].uniq
  opts[:before] = [*opts[:before], *self.outgoing].uniq
  @workflow.run(klass, opts)
end

#running?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/burst/job.rb', line 148

def running?
  started? && !finished? && !suspended?
end

#save!Object



31
32
33
34
35
36
37
# File 'lib/burst/job.rb', line 31

def save!
  @workflow.with_lock do
    @workflow.set_job(self)
    @workflow.save!
    yield if block_given?
  end
end

#set_output(data) ⇒ Object

store data to be available for next jobs



52
53
54
# File 'lib/burst/job.rb', line 52

def set_output(data)
  self.output = data
end

#start!Object

mark job as started when it is start performing

Raises:



107
108
109
110
# File 'lib/burst/job.rb', line 107

def start!
  raise Error.new('Already started') if started?
  self.started_at = current_timestamp
end

#started?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/burst/job.rb', line 140

def started?
  !started_at.nil?
end

#succeeded?Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/burst/job.rb', line 164

def succeeded?
  finished? && !failed?
end

#suspendObject

mark execution as suspended



57
58
59
# File 'lib/burst/job.rb', line 57

def suspend
  set_output(SUSPEND)
end

#suspend!Object

mark job as suspended



125
126
127
# File 'lib/burst/job.rb', line 125

def suspend!
  self.suspended_at = current_timestamp
end

#suspended?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/burst/job.rb', line 156

def suspended?
  !suspended_at.nil? && !resumed?
end