Class: Burstflow::Workflow

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Callbacks, Configuration
Defined in:
lib/burstflow/workflow.rb

Defined Under Namespace

Modules: Callbacks, Configuration Classes: Builder, InternalError

Constant Summary collapse

INITIAL =
'initial'.freeze
RUNNING =
'running'.freeze
FINISHED =
'finished'.freeze
FAILED =
'failed'.freeze
SUSPENDED =
'suspended'.freeze
STATUSES =
[INITIAL, RUNNING, FINISHED, FAILED, SUSPENDED].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#cacheObject

Returns the value of attribute cache.



25
26
27
# File 'lib/burstflow/workflow.rb', line 25

def cache
  @cache
end

#managerObject

Returns the value of attribute manager.



25
26
27
# File 'lib/burstflow/workflow.rb', line 25

def manager
  @manager
end

Class Method Details

.build(*args) ⇒ Object



55
56
57
58
59
60
# File 'lib/burstflow/workflow.rb', line 55

def self.build(*args)
  new.tap do |wf|
    builder = Burstflow::Workflow::Builder.new(wf, *args, &configuration)
    wf.flow = {'jobs_config' => builder.as_json}
  end
end

Instance Method Details

#add_error(job_orexception) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/burstflow/workflow.rb', line 101

def add_error job_orexception
  context = {
    created_at: Time.now.to_i
  }
  if job_orexception.is_a?(::Exception)
    context[:message]     = job_orexception.message
    context[:klass]       = job_orexception.class.to_s
    context[:backtrace]   = job_orexception.backtrace.first(10)
    context[:cause]       = job_orexception.cause
  else
    context[:job] = job_orexception.id
  end

  failures.push(context)
end

#attributesObject



45
46
47
48
49
50
51
52
53
# File 'lib/burstflow/workflow.rb', line 45

def attributes
  {
    id: self.id,
    jobs_config: self.jobs_config,
    type: self.class.to_s,
    status: status,
    failures: failures
  }
end

#complete!Object



131
132
133
134
135
136
137
138
139
# File 'lib/burstflow/workflow.rb', line 131

def complete!
  if has_errors?
    failed!
  elsif has_suspended_jobs?
    suspended!
  else
    finished!
  end
end

#failed!Object



165
166
167
168
169
170
171
172
173
# File 'lib/burstflow/workflow.rb', line 165

def failed!
  run_callbacks :failure do
    raise InternalError.new(self, "Can't fail: workflow already failed") if failed?
    raise InternalError.new(self, "Can't fail: workflow already finished") if finished?
    raise InternalError.new(self, "Can't fail: workflow in not runnig") if !(running? || suspended?)
    self.status = FAILED
    save!
  end
end

#finished!Object



175
176
177
178
179
180
181
182
183
# File 'lib/burstflow/workflow.rb', line 175

def finished!
  run_callbacks :finish do
    raise InternalError.new(self, "Can't finish: workflow already finished") if finished?
    raise InternalError.new(self, "Can't finish: workflow already failed") if failed?
    raise InternalError.new(self, "Can't finish: workflow in not runnig") if !running?
    self.status = FINISHED
    save!
  end 
end

#finished_atObject



153
154
155
# File 'lib/burstflow/workflow.rb', line 153

def finished_at
  last_job&.finished_at
end

#first_jobObject



141
142
143
# File 'lib/burstflow/workflow.rb', line 141

def first_job
  all_jobs.min_by{|n| n.started_at || Time.now.to_i }
end

#has_errors?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/burstflow/workflow.rb', line 117

def has_errors?
  failures.any?
end

#has_scheduled_jobs?Boolean

Returns:

  • (Boolean)


121
122
123
124
125
# File 'lib/burstflow/workflow.rb', line 121

def has_scheduled_jobs?
  cache[:has_scheduled_jobs] ||= jobs.any? do |job|
    job.scheduled? || (job.initial? && !job.enqueued?)
  end
end

#has_suspended_jobs?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/burstflow/workflow.rb', line 127

def has_suspended_jobs?
  cache[:has_suspended_jobs] ||= jobs.any?(&:suspended?)
end

#initial_jobsObject



97
98
99
# File 'lib/burstflow/workflow.rb', line 97

def initial_jobs
  cache[:initial_jobs] ||= jobs.select(&:initial?)
end

#job(id) ⇒ Object



89
90
91
# File 'lib/burstflow/workflow.rb', line 89

def job(id)
  Burstflow::Job.from_hash(self, job_hash(id))
end

#job_hash(id) ⇒ Object



85
86
87
# File 'lib/burstflow/workflow.rb', line 85

def job_hash(id)
  jobs_config[id].deep_dup
end

#jobsObject



77
78
79
80
81
82
83
# File 'lib/burstflow/workflow.rb', line 77

def jobs
  Enumerator.new do |y|
    jobs_config.keys.each do |id|
      y << job(id)
    end
  end
end

#last_jobObject



145
146
147
# File 'lib/burstflow/workflow.rb', line 145

def last_job
  all_jobs.max_by{|n| n.finished_at || 0 } if finished?
end

#reloadObject



62
63
64
65
# File 'lib/burstflow/workflow.rb', line 62

def reload(*)
  self.cache = {}
  super
end

#resume!(job_id, data) ⇒ Object



72
73
74
75
# File 'lib/burstflow/workflow.rb', line 72

def resume!(job_id, data)
  manager.resume_workflow!(job_id, data)
  self
end

#resumed!Object



195
196
197
198
199
200
201
202
203
204
# File 'lib/burstflow/workflow.rb', line 195

def resumed!
  run_callbacks :resume do
    raise InternalError.new(self, "Can't resume: workflow already running") if running?
    raise InternalError.new(self, "Can't resume: workflow already finished") if finished?
    raise InternalError.new(self, "Can't resume: workflow already failed") if failed?
    raise InternalError.new(self, "Can't resume: workflow in not suspended") if !suspended?
    self.status = RUNNING
    save!
  end
end

#runnig!Object

Raises:



157
158
159
160
161
162
163
# File 'lib/burstflow/workflow.rb', line 157

def runnig!
  raise InternalError.new(self, "Can't start: workflow already running") if (running? || suspended?)
  raise InternalError.new(self, "Can't start: workflow already failed") if failed?
  raise InternalError.new(self, "Can't start: workflow already finished") if finished?
  self.status = RUNNING
  save!
end

#set_job(job) ⇒ Object



93
94
95
# File 'lib/burstflow/workflow.rb', line 93

def set_job(job)
  jobs_config[job.id] = job.as_json
end

#start!Object



67
68
69
70
# File 'lib/burstflow/workflow.rb', line 67

def start!
  manager.start_workflow!
  self
end

#started_atObject



149
150
151
# File 'lib/burstflow/workflow.rb', line 149

def started_at
  first_job&.started_at
end

#suspended!Object



185
186
187
188
189
190
191
192
193
# File 'lib/burstflow/workflow.rb', line 185

def suspended!
  run_callbacks :suspend do
    raise InternalError.new(self, "Can't suspend: workflow already finished") if finished?
    raise InternalError.new(self, "Can't suspend: workflow already failed") if failed?
    raise InternalError.new(self, "Can't suspend: workflow in not runnig") if !running?
    self.status = SUSPENDED
    save!
  end
end