Module: OpenWFE::JournalReplay

Included in:
Journal
Defined in:
lib/openwfe/expool/journal_replay.rb

Overview

The code decicated to replay and reconstitute journal.

Defined Under Namespace

Classes: ExpoolState

Instance Method Summary collapse

Instance Method Details

#analyze(file_path) ⇒ Object

Outputs a report of the each of the main events that the journal traced.

The output goes to the stdout.

The output can be used to determine an offset number for a replay() of the journal.



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/openwfe/expool/journal_replay.rb', line 117

def analyze (file_path)

    states = decompose(file_path)

    states.each do |state|

        next if state.dynamic.length < 1

        puts
        puts state.to_s
        puts
    end
end

#decompose(file_path) ⇒ Object

Decomposes the given file_path into a list of states



134
135
136
137
# File 'lib/openwfe/expool/journal_replay.rb', line 134

def decompose (file_path)

    do_decompose(load_events(file_path), [], nil, 0)
end

#load_events(file_path) ⇒ Object

Loads a journal file and return the content as a list of events. This method is made available for unit tests, as a public method it has not much interest.



144
145
146
147
148
149
150
# File 'lib/openwfe/expool/journal_replay.rb', line 144

def load_events (file_path)

    File.open(file_path) do |f|
        s = YAML.load_stream f
        s.documents
    end
end

#replay(file_path, offset, trigger_action = false) ⇒ Object

Replays a given journal file.

The offset can be determined by running the analyze() method.

If ‘trigger_action’ is set to true, the apply or reply or cancel action found at the given offset will be triggered.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/openwfe/expool/journal_replay.rb', line 60

def replay (file_path, offset, trigger_action=false)

    states = decompose(file_path)

    state = nil

    states.each do |s|
        state = s if s.offset == offset
    end

    raise "cannot replay offset #{offset}" unless state

    #puts "expstorage size 0 = #{get_expression_storage.size}"

    state.static.each do |update|
        flow_expression = update[3]
        flow_expression.application_context = @application_context
        get_expression_pool.update(flow_expression)
    end

    get_expression_pool.reschedule

    #puts "expstorage size 1 = #{get_expression_storage.size}"

    return unless trigger_action

    #puts "sds : #{state.dynamic.size}"

    state.dynamic.each do |ply|

        message = ply[0]
        fei = extract_fei(ply[2])
        wi = ply[3]

        if wi
            #
            # apply, reply, reply_to_parent
            #
            get_expression_pool.send message, fei, wi
        else
            #
            # cancel
            #
            get_expression_pool.send message, fei
        end
    end
end

#replay_at_error(error_source_event) ⇒ Object

Takes an error event (as stored in the journal) and replays it (usually you’d have to fix the engine conf before replaying the error trigger)

(Make sure to fix the cause of the error before triggering this method)



160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/openwfe/expool/journal_replay.rb', line 160

def replay_at_error (error_source_event)

    get_expression_pool.queue_work \
        error_source_event[3], # message (:do_apply for example)
        error_source_event[2], # fei or exp
        error_source_event[4]  # workitem

    # 0 is :error and 1 is the date and time of the error

    linfo do
        fei = extract_fei(error_source_event[2])
        "replay_at_error() #{error_source_event[3]} #{fei}"
    end
end

#replay_at_last_error(wfid) ⇒ Object

Detects the last error that ocurred for a workflow instance and replays at that point (see replay_at_error).

(Make sure to fix the cause of the error before triggering this method)



182
183
184
185
186
187
188
189
190
191
# File 'lib/openwfe/expool/journal_replay.rb', line 182

def replay_at_last_error (wfid)

    events = load_events(get_path(wfid))

    error_event = events.reverse.find do |evt|
        evt[0] == :error
    end

    replay_at_error(error_event)
end