Class: SolidFlow::Workflow
- Inherits:
-
Object
- Object
- SolidFlow::Workflow
- Defined in:
- lib/solid_flow/workflow.rb
Overview
Base class that developers subclass to implement durable workflows. Provides a DSL for defining steps, signals, queries, and compensations.
Defined Under Namespace
Classes: StepDefinition
Instance Attribute Summary collapse
-
#ctx ⇒ Object
readonly
Returns the value of attribute ctx.
-
#execution ⇒ Object
readonly
Returns the value of attribute execution.
-
#history ⇒ Object
readonly
Returns the value of attribute history.
Class Method Summary collapse
- .compensate(step_name, with:) ⇒ Object
- .compensations ⇒ Object
- .defaults(options = {}) ⇒ Object
- .graph_signature ⇒ Object
- .inherited(subclass) ⇒ Object
- .on_query(name, method_name = nil, &block) ⇒ Object
- .on_signal(name, method_name = nil, &block) ⇒ Object
- .queries ⇒ Object
- .query(execution_id, query_name) ⇒ Object
- .query_handlers ⇒ Object
- .signal(execution_id, signal_name, payload = {}) ⇒ Object
- .signal_handlers ⇒ Object
- .signals ⇒ Object
- .start(**input) ⇒ Object
- .step(name, task: nil, **options, &block) ⇒ Object
- .steps ⇒ Object
- .workflow_compensations ⇒ Object
- .workflow_defaults ⇒ Object
- .workflow_name(value = nil) ⇒ Object
- .workflow_queries ⇒ Object
- .workflow_signals ⇒ Object
- .workflow_steps ⇒ Object
Instance Method Summary collapse
- #apply_signal(name, payload) ⇒ Object
- #cancel!(reason = "cancelled") ⇒ Object
- #cancelled? ⇒ Boolean
- #consume_wait_instructions ⇒ Object
-
#initialize(ctx:, execution:, history: []) ⇒ Workflow
constructor
A new instance of Workflow.
- #query_defined?(name) ⇒ Boolean
- #reset_wait_context! ⇒ Object
- #signal_defined?(name) ⇒ Boolean
- #wait ⇒ Object
Constructor Details
#initialize(ctx:, execution:, history: []) ⇒ Workflow
Returns a new instance of Workflow.
218 219 220 221 222 223 |
# File 'lib/solid_flow/workflow.rb', line 218 def initialize(ctx:, execution:, history: []) @ctx = ctx.with_indifferent_access @execution = execution @history = history @wait_context = nil end |
Instance Attribute Details
#ctx ⇒ Object (readonly)
Returns the value of attribute ctx.
216 217 218 |
# File 'lib/solid_flow/workflow.rb', line 216 def ctx @ctx end |
#execution ⇒ Object (readonly)
Returns the value of attribute execution.
216 217 218 |
# File 'lib/solid_flow/workflow.rb', line 216 def execution @execution end |
#history ⇒ Object (readonly)
Returns the value of attribute history.
216 217 218 |
# File 'lib/solid_flow/workflow.rb', line 216 def history @history end |
Class Method Details
.compensate(step_name, with:) ⇒ Object
154 155 156 |
# File 'lib/solid_flow/workflow.rb', line 154 def compensate(step_name, with:) workflow_compensations[step_name.to_sym] = with.to_sym end |
.compensations ⇒ Object
170 171 172 |
# File 'lib/solid_flow/workflow.rb', line 170 def compensations workflow_compensations.dup end |
.defaults(options = {}) ⇒ Object
98 99 100 |
# File 'lib/solid_flow/workflow.rb', line 98 def defaults( = {}) workflow_defaults.deep_merge!(.deep_symbolize_keys) end |
.graph_signature ⇒ Object
174 175 176 |
# File 'lib/solid_flow/workflow.rb', line 174 def graph_signature Determinism.graph_signature(self) end |
.inherited(subclass) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/solid_flow/workflow.rb', line 43 def inherited(subclass) super SolidFlow.configuration.workflow_registry.register(subclass.workflow_name, subclass) subclass.instance_variable_set(:@workflow_steps, workflow_steps.map(&:dup)) subclass.instance_variable_set(:@workflow_signals, workflow_signals.deep_dup) subclass.instance_variable_set(:@workflow_queries, workflow_queries.deep_dup) subclass.instance_variable_set(:@signal_handlers, signal_handlers.deep_dup) subclass.instance_variable_set(:@query_handlers, query_handlers.deep_dup) subclass.instance_variable_set(:@workflow_defaults, workflow_defaults.deep_dup) subclass.instance_variable_set(:@workflow_compensations, workflow_compensations.deep_dup) end |
.on_query(name, method_name = nil, &block) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/solid_flow/workflow.rb', line 141 def on_query(name, method_name = nil, &block) handler = if method_name lambda { send(method_name) } else block end raise ArgumentError, "on_query requires a block or method name" unless handler query_handlers[name.to_sym] = handler end |
.on_signal(name, method_name = nil, &block) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/solid_flow/workflow.rb', line 124 def on_signal(name, method_name = nil, &block) handler = if method_name lambda { |payload| send(method_name, payload) } else block end raise ArgumentError, "on_signal requires a block or method name" unless handler signal_handlers[name.to_sym] = handler end |
.queries ⇒ Object
166 167 168 |
# File 'lib/solid_flow/workflow.rb', line 166 def queries workflow_queries.dup end |
.query(execution_id, query_name) ⇒ Object
137 138 139 |
# File 'lib/solid_flow/workflow.rb', line 137 def query(name) workflow_queries[name.to_sym] = {} end |
.query_handlers ⇒ Object
83 84 85 |
# File 'lib/solid_flow/workflow.rb', line 83 def query_handlers @query_handlers ||= {} end |
.signal(execution_id, signal_name, payload = {}) ⇒ Object
120 121 122 |
# File 'lib/solid_flow/workflow.rb', line 120 def signal(name, buffer: true) workflow_signals[name.to_sym] = { buffer: } end |
.signal_handlers ⇒ Object
79 80 81 |
# File 'lib/solid_flow/workflow.rb', line 79 def signal_handlers @signal_handlers ||= {} end |
.signals ⇒ Object
162 163 164 |
# File 'lib/solid_flow/workflow.rb', line 162 def signals workflow_signals.dup end |
.start(**input) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/solid_flow/workflow.rb', line 178 def start(**input) SolidFlow.instrument("solidflow.execution.start", workflow: workflow_name, input:) signature = graph_signature SolidFlow.store.start_execution( workflow_class: self, input: input, graph_signature: signature ) end |
.step(name, task: nil, **options, &block) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/solid_flow/workflow.rb', line 102 def step(name, task: nil, **, &block) raise ArgumentError, "step #{name} already defined" if workflow_steps.any? { |s| s.name == name.to_sym } retry_policy = workflow_defaults[:retry].merge(.delete(:retry) || {}) timeouts = workflow_defaults[:timeouts].merge(.delete(:timeouts) || {}) idempotency = .delete(:idempotency_key) workflow_steps << StepDefinition.new( name: name.to_sym, task: task&.to_sym, block: block, retry_policy: retry_policy, timeouts: timeouts, idempotency_key: idempotency, options: ) end |
.steps ⇒ Object
158 159 160 |
# File 'lib/solid_flow/workflow.rb', line 158 def steps workflow_steps.dup end |
.workflow_compensations ⇒ Object
94 95 96 |
# File 'lib/solid_flow/workflow.rb', line 94 def workflow_compensations @workflow_compensations ||= {} end |
.workflow_defaults ⇒ Object
87 88 89 90 91 92 |
# File 'lib/solid_flow/workflow.rb', line 87 def workflow_defaults @workflow_defaults ||= { retry: {}, timeouts: {} } end |
.workflow_name(value = nil) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/solid_flow/workflow.rb', line 56 def workflow_name(value = nil) if value normalized = value.to_s @workflow_name = normalized SolidFlow.configuration.workflow_registry.register(normalized, self) @workflow_name else @workflow_name ||= name || "anonymous_workflow" end end |
.workflow_queries ⇒ Object
75 76 77 |
# File 'lib/solid_flow/workflow.rb', line 75 def workflow_queries @workflow_queries ||= {} end |
.workflow_signals ⇒ Object
71 72 73 |
# File 'lib/solid_flow/workflow.rb', line 71 def workflow_signals @workflow_signals ||= {} end |
.workflow_steps ⇒ Object
67 68 69 |
# File 'lib/solid_flow/workflow.rb', line 67 def workflow_steps @workflow_steps ||= [] end |
Instance Method Details
#apply_signal(name, payload) ⇒ Object
239 240 241 242 243 244 |
# File 'lib/solid_flow/workflow.rb', line 239 def apply_signal(name, payload) handler = self.class.signal_handlers[name.to_sym] raise Errors::UnknownSignal, name unless handler instance_exec(payload.with_indifferent_access, &handler) end |
#cancel!(reason = "cancelled") ⇒ Object
254 255 256 257 |
# File 'lib/solid_flow/workflow.rb', line 254 def cancel!(reason = "cancelled") @cancelled = true raise Errors::Cancelled, reason end |
#cancelled? ⇒ Boolean
259 260 261 |
# File 'lib/solid_flow/workflow.rb', line 259 def cancelled? !!@cancelled end |
#consume_wait_instructions ⇒ Object
233 234 235 236 237 |
# File 'lib/solid_flow/workflow.rb', line 233 def consume_wait_instructions instructions = @wait_context&.instructions || [] @wait_context = nil instructions end |
#query_defined?(name) ⇒ Boolean
250 251 252 |
# File 'lib/solid_flow/workflow.rb', line 250 def query_defined?(name) self.class.query_handlers.key?(name.to_sym) end |
#reset_wait_context! ⇒ Object
229 230 231 |
# File 'lib/solid_flow/workflow.rb', line 229 def reset_wait_context! @wait_context = Wait::Context.new end |
#signal_defined?(name) ⇒ Boolean
246 247 248 |
# File 'lib/solid_flow/workflow.rb', line 246 def signal_defined?(name) self.class.signal_handlers.key?(name.to_sym) || self.class.workflow_signals.key?(name.to_sym) end |