Class: SolidFlow::Workflow

Inherits:
Object
  • Object
show all
Defined in:
lib/solid_flow/workflow.rb

Overview

Base class that developers subclass to implement durable workflows. Provides a DSL for defining steps, signals, queries, and compensations.

Defined Under Namespace

Classes: StepDefinition

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ctx:, execution:, history: []) ⇒ Workflow

Returns a new instance of Workflow.



218
219
220
221
222
223
# File 'lib/solid_flow/workflow.rb', line 218

def initialize(ctx:, execution:, history: [])
  @ctx         = ctx.with_indifferent_access
  @execution   = execution
  @history     = history
  @wait_context = nil
end

Instance Attribute Details

#ctxObject (readonly)

Returns the value of attribute ctx.



216
217
218
# File 'lib/solid_flow/workflow.rb', line 216

def ctx
  @ctx
end

#executionObject (readonly)

Returns the value of attribute execution.



216
217
218
# File 'lib/solid_flow/workflow.rb', line 216

def execution
  @execution
end

#historyObject (readonly)

Returns the value of attribute history.



216
217
218
# File 'lib/solid_flow/workflow.rb', line 216

def history
  @history
end

Class Method Details

.compensate(step_name, with:) ⇒ Object



154
155
156
# File 'lib/solid_flow/workflow.rb', line 154

def compensate(step_name, with:)
  workflow_compensations[step_name.to_sym] = with.to_sym
end

.compensationsObject



170
171
172
# File 'lib/solid_flow/workflow.rb', line 170

def compensations
  workflow_compensations.dup
end

.defaults(options = {}) ⇒ Object



98
99
100
# File 'lib/solid_flow/workflow.rb', line 98

def defaults(options = {})
  workflow_defaults.deep_merge!(options.deep_symbolize_keys)
end

.graph_signatureObject



174
175
176
# File 'lib/solid_flow/workflow.rb', line 174

def graph_signature
  Determinism.graph_signature(self)
end

.inherited(subclass) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/solid_flow/workflow.rb', line 43

def inherited(subclass)
  super

  SolidFlow.configuration.workflow_registry.register(subclass.workflow_name, subclass)
  subclass.instance_variable_set(:@workflow_steps, workflow_steps.map(&:dup))
  subclass.instance_variable_set(:@workflow_signals, workflow_signals.deep_dup)
  subclass.instance_variable_set(:@workflow_queries, workflow_queries.deep_dup)
  subclass.instance_variable_set(:@signal_handlers, signal_handlers.deep_dup)
  subclass.instance_variable_set(:@query_handlers, query_handlers.deep_dup)
  subclass.instance_variable_set(:@workflow_defaults, workflow_defaults.deep_dup)
  subclass.instance_variable_set(:@workflow_compensations, workflow_compensations.deep_dup)
end

.on_query(name, method_name = nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/solid_flow/workflow.rb', line 141

def on_query(name, method_name = nil, &block)
  handler =
    if method_name
      lambda { send(method_name) }
    else
      block
    end

  raise ArgumentError, "on_query requires a block or method name" unless handler

  query_handlers[name.to_sym] = handler
end

.on_signal(name, method_name = nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/solid_flow/workflow.rb', line 124

def on_signal(name, method_name = nil, &block)
  handler =
    if method_name
      lambda { |payload| send(method_name, payload) }
    else
      block
    end

  raise ArgumentError, "on_signal requires a block or method name" unless handler

  signal_handlers[name.to_sym] = handler
end

.queriesObject



166
167
168
# File 'lib/solid_flow/workflow.rb', line 166

def queries
  workflow_queries.dup
end

.query(execution_id, query_name) ⇒ Object



137
138
139
# File 'lib/solid_flow/workflow.rb', line 137

def query(name)
  workflow_queries[name.to_sym] = {}
end

.query_handlersObject



83
84
85
# File 'lib/solid_flow/workflow.rb', line 83

def query_handlers
  @query_handlers ||= {}
end

.signal(execution_id, signal_name, payload = {}) ⇒ Object



120
121
122
# File 'lib/solid_flow/workflow.rb', line 120

def signal(name, buffer: true)
  workflow_signals[name.to_sym] = { buffer: }
end

.signal_handlersObject



79
80
81
# File 'lib/solid_flow/workflow.rb', line 79

def signal_handlers
  @signal_handlers ||= {}
end

.signalsObject



162
163
164
# File 'lib/solid_flow/workflow.rb', line 162

def signals
  workflow_signals.dup
end

.start(**input) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/solid_flow/workflow.rb', line 178

def start(**input)
  SolidFlow.instrument("solidflow.execution.start", workflow: workflow_name, input:)
  signature = graph_signature
  SolidFlow.store.start_execution(
    workflow_class: self,
    input: input,
    graph_signature: signature
  )
end

.step(name, task: nil, **options, &block) ⇒ Object

Raises:

  • (ArgumentError)


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/solid_flow/workflow.rb', line 102

def step(name, task: nil, **options, &block)
  raise ArgumentError, "step #{name} already defined" if workflow_steps.any? { |s| s.name == name.to_sym }

  retry_policy  = workflow_defaults[:retry].merge(options.delete(:retry) || {})
  timeouts      = workflow_defaults[:timeouts].merge(options.delete(:timeouts) || {})
  idempotency   = options.delete(:idempotency_key)

  workflow_steps << StepDefinition.new(
    name: name.to_sym,
    task: task&.to_sym,
    block: block,
    retry_policy: retry_policy,
    timeouts: timeouts,
    idempotency_key: idempotency,
    options: options
  )
end

.stepsObject



158
159
160
# File 'lib/solid_flow/workflow.rb', line 158

def steps
  workflow_steps.dup
end

.workflow_compensationsObject



94
95
96
# File 'lib/solid_flow/workflow.rb', line 94

def workflow_compensations
  @workflow_compensations ||= {}
end

.workflow_defaultsObject



87
88
89
90
91
92
# File 'lib/solid_flow/workflow.rb', line 87

def workflow_defaults
  @workflow_defaults ||= {
    retry: {},
    timeouts: {}
  }
end

.workflow_name(value = nil) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/solid_flow/workflow.rb', line 56

def workflow_name(value = nil)
  if value
    normalized = value.to_s
    @workflow_name = normalized
    SolidFlow.configuration.workflow_registry.register(normalized, self)
    @workflow_name
  else
    @workflow_name ||= name || "anonymous_workflow"
  end
end

.workflow_queriesObject



75
76
77
# File 'lib/solid_flow/workflow.rb', line 75

def workflow_queries
  @workflow_queries ||= {}
end

.workflow_signalsObject



71
72
73
# File 'lib/solid_flow/workflow.rb', line 71

def workflow_signals
  @workflow_signals ||= {}
end

.workflow_stepsObject



67
68
69
# File 'lib/solid_flow/workflow.rb', line 67

def workflow_steps
  @workflow_steps ||= []
end

Instance Method Details

#apply_signal(name, payload) ⇒ Object



239
240
241
242
243
244
# File 'lib/solid_flow/workflow.rb', line 239

def apply_signal(name, payload)
  handler = self.class.signal_handlers[name.to_sym]
  raise Errors::UnknownSignal, name unless handler

  instance_exec(payload.with_indifferent_access, &handler)
end

#cancel!(reason = "cancelled") ⇒ Object

Raises:



254
255
256
257
# File 'lib/solid_flow/workflow.rb', line 254

def cancel!(reason = "cancelled")
  @cancelled = true
  raise Errors::Cancelled, reason
end

#cancelled?Boolean

Returns:

  • (Boolean)


259
260
261
# File 'lib/solid_flow/workflow.rb', line 259

def cancelled?
  !!@cancelled
end

#consume_wait_instructionsObject



233
234
235
236
237
# File 'lib/solid_flow/workflow.rb', line 233

def consume_wait_instructions
  instructions = @wait_context&.instructions || []
  @wait_context = nil
  instructions
end

#query_defined?(name) ⇒ Boolean

Returns:

  • (Boolean)


250
251
252
# File 'lib/solid_flow/workflow.rb', line 250

def query_defined?(name)
  self.class.query_handlers.key?(name.to_sym)
end

#reset_wait_context!Object



229
230
231
# File 'lib/solid_flow/workflow.rb', line 229

def reset_wait_context!
  @wait_context = Wait::Context.new
end

#signal_defined?(name) ⇒ Boolean

Returns:

  • (Boolean)


246
247
248
# File 'lib/solid_flow/workflow.rb', line 246

def signal_defined?(name)
  self.class.signal_handlers.key?(name.to_sym) || self.class.workflow_signals.key?(name.to_sym)
end

#waitObject



225
226
227
# File 'lib/solid_flow/workflow.rb', line 225

def wait
  @wait_context ||= Wait::Context.new
end