Module: OpenWFE::JournalReplay
- Included in:
- Journal
- Defined in:
- lib/openwfe/expool/journal_replay.rb
Overview
The code decicated to replay and reconstitute journal.
Defined Under Namespace
Classes: ExpoolState
Instance Method Summary collapse
-
#analyze(file_path) ⇒ Object
Outputs a report of the each of the main events that the journal traced.
-
#decompose(file_path) ⇒ Object
Decomposes the given file_path into a list of states.
-
#load_events(file_path) ⇒ Object
Loads a journal file and return the content as a list of events.
-
#replay(file_path, offset, trigger_action = false) ⇒ Object
Replays a given journal file.
-
#replay_at_error(error_source_event) ⇒ Object
Takes an error event (as stored in the journal) and replays it (usually you’d have to fix the engine conf before replaying the error trigger).
-
#replay_at_last_error(wfid) ⇒ Object
Detects the last error that ocurred for a workflow instance and replays at that point (see replay_at_error).
Instance Method Details
#analyze(file_path) ⇒ Object
Outputs a report of the each of the main events that the journal traced.
The output goes to the stdout.
The output can be used to determine an offset number for a replay() of the journal.
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/openwfe/expool/journal_replay.rb', line 117 def analyze (file_path) states = decompose(file_path) states.each do |state| next if state.dynamic.length < 1 puts puts state.to_s puts end end |
#decompose(file_path) ⇒ Object
Decomposes the given file_path into a list of states
134 135 136 137 |
# File 'lib/openwfe/expool/journal_replay.rb', line 134 def decompose (file_path) do_decompose(load_events(file_path), [], nil, 0) end |
#load_events(file_path) ⇒ Object
Loads a journal file and return the content as a list of events. This method is made available for unit tests, as a public method it has not much interest.
144 145 146 147 148 149 150 |
# File 'lib/openwfe/expool/journal_replay.rb', line 144 def load_events (file_path) File.open(file_path) do |f| s = YAML.load_stream f s.documents end end |
#replay(file_path, offset, trigger_action = false) ⇒ Object
Replays a given journal file.
The offset can be determined by running the analyze() method.
If ‘trigger_action’ is set to true, the apply or reply or cancel action found at the given offset will be triggered.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/openwfe/expool/journal_replay.rb', line 60 def replay (file_path, offset, trigger_action=false) states = decompose(file_path) state = nil states.each do |s| state = s if s.offset == offset end raise "cannot replay offset #{offset}" unless state #puts "expstorage size 0 = #{get_expression_storage.size}" state.static.each do |update| flow_expression = update[3] flow_expression.application_context = @application_context get_expression_pool.update(flow_expression) end get_expression_pool.reschedule #puts "expstorage size 1 = #{get_expression_storage.size}" return unless trigger_action #puts "sds : #{state.dynamic.size}" state.dynamic.each do |ply| = ply[0] fei = extract_fei(ply[2]) wi = ply[3] if wi # # apply, reply, reply_to_parent # get_expression_pool.send , fei, wi else # # cancel # get_expression_pool.send , fei end end end |
#replay_at_error(error_source_event) ⇒ Object
Takes an error event (as stored in the journal) and replays it (usually you’d have to fix the engine conf before replaying the error trigger)
(Make sure to fix the cause of the error before triggering this method)
160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/openwfe/expool/journal_replay.rb', line 160 def replay_at_error (error_source_event) get_expression_pool.queue_work \ error_source_event[3], # message (:do_apply for example) error_source_event[2], # fei or exp error_source_event[4] # workitem # 0 is :error and 1 is the date and time of the error linfo do fei = extract_fei(error_source_event[2]) "replay_at_error() #{error_source_event[3]} #{fei}" end end |
#replay_at_last_error(wfid) ⇒ Object
Detects the last error that ocurred for a workflow instance and replays at that point (see replay_at_error).
(Make sure to fix the cause of the error before triggering this method)
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/openwfe/expool/journal_replay.rb', line 182 def replay_at_last_error (wfid) events = load_events(get_path(wfid)) error_event = events.reverse.find do |evt| evt[0] == :error end replay_at_error(error_event) end |