Class: Yawl::Step
- Inherits:
-
Sequel::Model
- Object
- Sequel::Model
- Yawl::Step
- Defined in:
- lib/yawl/step.rb
Defined Under Namespace
Class Method Summary collapse
Instance Method Summary collapse
- #duration ⇒ Object
- #execute ⇒ Object
- #handle_error(e) ⇒ Object
- #log(data, &block) ⇒ Object
- #out_of_attempts? ⇒ Boolean
- #real_step ⇒ Object
- #start ⇒ Object
- #start_after_delay ⇒ Object
- #start_in(seconds) ⇒ Object
- #to_public_h ⇒ Object
Class Method Details
.execute(id) ⇒ Object
32 33 34 |
# File 'lib/yawl/step.rb', line 32 def self.execute(id) self[id].execute end |
.log(data, &block) ⇒ Object
24 25 26 |
# File 'lib/yawl/step.rb', line 24 def self.log(data, &block) Log.log({ ns: "yawl-step" }.merge(data), &block) end |
.restart_interrupted ⇒ Object
36 37 38 39 40 |
# File 'lib/yawl/step.rb', line 36 def self.restart_interrupted where(:state => "interrupted").each do |step| step.start end end |
Instance Method Details
#duration ⇒ Object
42 43 44 45 46 |
# File 'lib/yawl/step.rb', line 42 def duration if attempts.any? (attempts.last.completed_at || Time.now) - attempts.first.started_at end end |
#execute ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/yawl/step.rb', line 71 def execute log(fn: "execute") do begin # TODO(dpiddy): transactions here? update(:state => "executing") attempt = add_attempt(:started_at => Time.now) real_step.run_with_log log(fn: "execute", at: "run_completed") attempt.update(:output => real_step.output, :completed_at => Time.now) update(:state => "completed") process.step_finished rescue Step::Tired => e log(:fn => "execute", :at => "sleep") attempt.update(:output => "#{real_step.output}\n\n---\n#{e}\n", :completed_at => Time.now) handle_error(e) rescue Step::Fatal, StandardError, SignalException => e attempt.update(:output => "#{real_step.output}\n\n---\nCAUGHT ERROR: #{e}\n#{e.backtrace.join("\n")}\n", :completed_at => Time.now) log(:fn => "execute", :at => "caught_exception", :class => e.class, :message => e.) handle_error(e) end end end |
#handle_error(e) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/yawl/step.rb', line 95 def handle_error(e) if out_of_attempts? || e.is_a?(Step::Fatal) update(:state => "failed") process.step_failed raise elsif SignalException === e && e.signm == "SIGTERM" # we are shutting down update(:state => "interrupted") raise else update(:state => "pending") start_after_delay end end |
#log(data, &block) ⇒ Object
28 29 30 |
# File 'lib/yawl/step.rb', line 28 def log(data, &block) self.class.log({ process_id: process.id, step_id: id, step_name: name }.merge(data), &block) end |
#out_of_attempts? ⇒ Boolean
67 68 69 |
# File 'lib/yawl/step.rb', line 67 def out_of_attempts? attempts.count >= real_step.attempts end |
#real_step ⇒ Object
63 64 65 |
# File 'lib/yawl/step.rb', line 63 def real_step @real_step ||= Steps.for(name, self) end |
#start ⇒ Object
48 49 50 51 52 |
# File 'lib/yawl/step.rb', line 48 def start log(fn: "start") update(:state => "pending") QC.enqueue("Yawl::Step.execute", id) end |
#start_after_delay ⇒ Object
59 60 61 |
# File 'lib/yawl/step.rb', line 59 def start_after_delay start_in(real_step.delay_between_attempts) end |
#start_in(seconds) ⇒ Object
54 55 56 57 |
# File 'lib/yawl/step.rb', line 54 def start_in(seconds) log(fn: "start_in", seconds: seconds) QC.enqueue_in(seconds, "Yawl::Step.execute", id) end |
#to_public_h ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/yawl/step.rb', line 109 def to_public_h { "seq" => seq, "name" => name, "state" => state } end |