21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/workflow_rb/services/workflow_executor.rb', line 21
def execute(workflow)
@logger.debug("Executing workflow #{workflow.id}")
exe_pointers = workflow.execution_pointers.select { |x| x.active }
definition = @registry.get_definition(workflow.definition_id, workflow.version)
if not definition
raise Exception "Workflow definition #{workflow.definition_id}"
end
exe_pointers.each do |pointer|
step = definition.steps.select { |x| x.id == pointer.step_id }.first
if not step
raise Exception "Step #{pointer.step_id} not found in definition"
end
if (step.kind_of?(SubscriptionStep)) and (not pointer.event_published)
pointer.event_name = step.event_name
pointer.event_key = step.event_key
pointer.active = false
@host.subscribe_event(workflow.id, step.id, step.event_name, step.event_key)
next
end
if not pointer.start_time
pointer.start_time = Time.new
end
execution_context = StepExecutionContext.new
execution_context.persistence_data = pointer.persistence_data
execution_context.workflow = workflow
execution_context.step = step
if step.body.kind_of?(Proc)
body_class = Class.new(StepBody) do
def initialize(body)
@body = body
end
def run(context)
@body.call(context)
end
end
body_obj = body_class.new(step.body)
else
if step.body <= StepBody
body_obj = step.body.new
end
end
if not body_obj
raise "Cannot construct step body #{step.body}"
end
step.inputs.each do |input|
io_value = input.value.call(workflow.data)
body_obj.send("#{input.property}=", io_value)
end
if (body_obj.kind_of?(SubscriptionStepBody)) and (pointer.event_published)
body_obj.event_data = pointer.event_data
end
result = body_obj.run(execution_context)
if (result.proceed)
step.outputs.each do |output|
io_value = output.value.call(body_obj)
workflow.data.send("#{output.property}=", io_value)
end
pointer.active = false
pointer.end_time = Time.new
fork_counter = 1
pointer.path_terminator = true
step.outcomes.select {|x| x.value == result.outcome_value}.each do |outcome|
new_pointer = ExecutionPointer.new
new_pointer.active = true
new_pointer.step_id = outcome.next_step
new_pointer.concurrent_fork = fork_counter * pointer.concurrent_fork
workflow.execution_pointers << new_pointer
pointer.path_terminator = false
fork_counter += 1
end
else
pointer.persistence_data = result.persistence_data
pointer.sleep_until = result.sleep_until
end
end
determine_next_execution(workflow)
@persistence.persist_workflow(workflow)
end
|