Class: Yawl::Step

Inherits:
Sequel::Model
  • Object
show all
Defined in:
lib/yawl/step.rb

Defined Under Namespace

Classes: Fatal, Tired

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.execute(id) ⇒ Object



32
33
34
# File 'lib/yawl/step.rb', line 32

def self.execute(id)
  self[id].execute
end

.log(data, &block) ⇒ Object



24
25
26
# File 'lib/yawl/step.rb', line 24

def self.log(data, &block)
  Log.log({ ns: "yawl-step" }.merge(data), &block)
end

.restart_interruptedObject



36
37
38
39
40
# File 'lib/yawl/step.rb', line 36

def self.restart_interrupted
  where(:state => "interrupted").each do |step|
    step.start
  end
end

Instance Method Details

#durationObject



42
43
44
45
46
# File 'lib/yawl/step.rb', line 42

def duration
  if attempts.any?
    (attempts.last.completed_at || Time.now) - attempts.first.started_at
  end
end

#executeObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/yawl/step.rb', line 71

def execute
  log(fn: "execute") do
    begin
      # TODO(dpiddy): transactions here?
      update(:state => "executing")
      attempt = add_attempt(:started_at => Time.now)
      real_step.run_with_log
      log(fn: "execute", at: "run_completed")
      attempt.update(:output => real_step.output, :completed_at => Time.now)
      update(:state => "completed")
      process.step_finished

    rescue Step::Tired => e
      log(:fn => "execute", :at => "sleep")
      attempt.update(:output => "#{real_step.output}\n\n---\n#{e}\n", :completed_at => Time.now)
      handle_error(e)
    rescue Step::Fatal, StandardError, SignalException => e
      attempt.update(:output => "#{real_step.output}\n\n---\nCAUGHT ERROR: #{e}\n#{e.backtrace.join("\n")}\n", :completed_at => Time.now)
      log(:fn => "execute", :at => "caught_exception", :class => e.class, :message => e.message)
      handle_error(e)
    end
  end
end

#handle_error(e) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/yawl/step.rb', line 95

def handle_error(e)
  if out_of_attempts? || e.is_a?(Step::Fatal)
    update(:state => "failed")
    process.step_failed
    raise
  elsif SignalException === e && e.signm == "SIGTERM" # we are shutting down
    update(:state => "interrupted")
    raise
  else
    update(:state => "pending")
    start_after_delay
  end
end

#log(data, &block) ⇒ Object



28
29
30
# File 'lib/yawl/step.rb', line 28

def log(data, &block)
  self.class.log({ process_id: process.id, step_id: id, step_name: name }.merge(data), &block)
end

#out_of_attempts?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/yawl/step.rb', line 67

def out_of_attempts?
  attempts.count >= real_step.attempts
end

#real_stepObject



63
64
65
# File 'lib/yawl/step.rb', line 63

def real_step
  @real_step ||= Steps.for(name, self)
end

#startObject



48
49
50
51
52
# File 'lib/yawl/step.rb', line 48

def start
  log(fn: "start")
  update(:state => "pending")
  QC.enqueue("Yawl::Step.execute", id)
end

#start_after_delayObject



59
60
61
# File 'lib/yawl/step.rb', line 59

def start_after_delay
  start_in(real_step.delay_between_attempts)
end

#start_in(seconds) ⇒ Object



54
55
56
57
# File 'lib/yawl/step.rb', line 54

def start_in(seconds)
  log(fn: "start_in", seconds: seconds)
  QC.enqueue_in(seconds, "Yawl::Step.execute", id)
end

#to_public_hObject



109
110
111
112
113
114
115
# File 'lib/yawl/step.rb', line 109

def to_public_h
  {
    "seq" => seq,
    "name" => name,
    "state" => state
  }
end