Class: Gaskit::Flow

Inherits:
Object
  • Object
show all
Includes:
Hookable
Defined in:
lib/gaskit/flow.rb

Overview

The ‘Gaskit::Flow` class defines and executes a pipeline of operations, each of which returns a `Gaskit::OperationResult`. Flows can be defined via a class-based DSL or run inline using a block.

Steps share context and flow state, with support for early exits, manual control, and step-by-step walking. A flow is constructed from a sequence of registered operations, each of which may receive and return input to influence subsequent steps.

## Features

  • Declarative or block-based step definitions

  • Per-step argument and context overrides

  • Shared flow-level context with metadata injection

  • Manual stepping (‘walk` and `next_step`)

  • Rewind capability for retryable flows

  • Hookable lifecycle via ‘Gaskit::Hookable`

Examples:

Inline (block-based) flow

result = Gaskit::Flow.call(1) do
  step Add
  step Double
end

result.value # => 4

Class-based flow

class MyFlow < Gaskit::Flow
  step Add
  step Double
end

result = MyFlow.call(1)
result.value # => 4

Step-by-step execution (walk)

flow = MyFlow.walk(1)
while flow.has_next_step?
  flow.next_step
end
flow.result.value # => 4

Rewinding and re-running

flow.rewind
flow.next_step # starts from first step again

See Also:

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Hookable

#apply_after_hooks, #apply_around_hooks, #apply_before_hooks, #apply_hooks, included

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



155
156
157
# File 'lib/gaskit/flow.rb', line 155

def logger
  @logger
end

Class Method Details

.call(*args, context: {}, **kwargs, &block) ⇒ FlowResult

Executes the flow with soft failure handling.

Parameters:

  • args (Array)

    Positional arguments for the first step.

  • context (Hash) (defaults to: {})

    Shared context across all steps.

  • kwargs (Hash)

    Keyword arguments for the first step.

Returns:

  • (FlowResult)

    The result of the flow execution.



94
95
96
# File 'lib/gaskit/flow.rb', line 94

def call(*args, context: {}, **kwargs, &block)
  invoke(false, context, *args, **kwargs, &block)
end

.call!(*args, context: {}, **kwargs, &block) ⇒ FlowResult

Executes the flow with hard failure handling (raises on unhandled errors).

Parameters:

  • args (Array)

    Positional arguments for the first step.

  • context (Hash) (defaults to: {})

    Shared context across all steps.

  • kwargs (Hash)

    Keyword arguments for the first step.

Returns:

  • (FlowResult)

    The result of the flow execution.

Raises:

  • (StandardError)

    If an error occurs in any step.



105
106
107
# File 'lib/gaskit/flow.rb', line 105

def call!(*args, context: {}, **kwargs, &block)
  invoke(true, context, *args, **kwargs, &block)
end

.inherited(subclass) ⇒ void

This method returns an undefined value.

Called when a subclass is defined, initializing an empty step list.

Parameters:

  • subclass (Class)

    the subclass inheriting from Flow



66
67
68
69
# File 'lib/gaskit/flow.rb', line 66

def inherited(subclass)
  subclass.instance_variable_set(:@steps, @steps)
  super
end

.invoke(raise_on_failure, context, *args, **kwargs, &block) ⇒ FlowResult

Internal flow execution logic.

Parameters:

  • raise_on_failure (Boolean)

    Whether to raise on failure.

  • context (Hash)

    Execution context.

  • args (Array)

    Initial positional arguments.

  • kwargs (Hash)

    Initial keyword arguments.

Returns:

  • (FlowResult)

    The result of the executed flow.



136
137
138
139
# File 'lib/gaskit/flow.rb', line 136

def invoke(raise_on_failure, context, *args, **kwargs, &block)
  flow = build(raise_on_failure, context, *args, **kwargs, &block)
  flow.send(:execute, &block)
end

.step(operation, context: {}, **kwargs) ⇒ void

This method returns an undefined value.

Registers a step in the class-level flow definition.

Parameters:

  • operation (Class<Gaskit::Operation>)

    The operation to run.

  • context (Hash) (defaults to: {})

    Optional context for this step.

  • kwargs (Hash)

    Keyword arguments for this step.



84
85
86
# File 'lib/gaskit/flow.rb', line 84

def step(operation, context: {}, **kwargs)
  steps << [operation, context, kwargs]
end

.stepsArray<Array>

Returns the list of declared steps for the flow class.

Returns:

  • (Array<Array>)

    An array of [operation, context, kwargs] triples.



74
75
76
# File 'lib/gaskit/flow.rb', line 74

def steps
  @steps ||= []
end

.walk(*args, context: {}, **kwargs) ⇒ Flow

Creates a flow instance for step-by-step execution.

Parameters:

  • args (Array)

    Initial positional arguments.

  • context (Hash) (defaults to: {})

    Shared execution context.

  • kwargs (Hash)

    Initial keyword arguments.

Returns:

  • (Flow)

    A walkable flow instance.



115
116
117
# File 'lib/gaskit/flow.rb', line 115

def walk(*args, context: {}, **kwargs)
  build(false, context, *args, **kwargs)
end

.walk!(*args, context: {}, **kwargs) ⇒ Flow

Same as #walk but raises on any step failure.

Parameters:

  • args (Array)

    Initial positional arguments.

  • context (Hash) (defaults to: {})

    Shared execution context.

  • kwargs (Hash)

    Initial keyword arguments.

Returns:

  • (Flow)

    A walkable flow instance with hard failure behavior.



125
126
127
# File 'lib/gaskit/flow.rb', line 125

def walk!(*args, context: {}, **kwargs)
  build(true, context, *args, **kwargs)
end

Instance Method Details

#next_step(*args, **kwargs) ⇒ Gaskit::OperationResult?

Executes the next step in the flow.

Parameters:

  • args (Array)

    Optional positional arguments to override input.

  • kwargs (Hash)

    Optional keyword arguments to override input.

Returns:



186
187
188
189
190
191
192
193
# File 'lib/gaskit/flow.rb', line 186

def next_step(*args, **kwargs)
  return unless next_step?

  operation, context, step_kwargs = @step_sequence[@step_index]
  @step_index += 1

  process_step(operation, context, step_kwargs, args, kwargs)
end

#next_step?Boolean

Returns true if there are more steps remaining in the sequence.

Returns:

  • (Boolean)

    Whether the flow has more steps to execute.



160
161
162
# File 'lib/gaskit/flow.rb', line 160

def next_step?
  @step_index < @step_sequence.size
end

#pending_step(*args, **kwargs) ⇒ Hash?

Returns the metadata for the pending step.

Returns:

  • (Hash, nil)

    The pending step’s operation, context, and kwargs, or nil if no steps remain.



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/gaskit/flow.rb', line 167

def pending_step(*args, **kwargs)
  return nil unless next_step?

  operation, context, step_kwargs = @step_sequence[@step_index]
  context = @context.merge(context)
  args, kwargs = resolve_step_input(
    args: args,
    kwargs: kwargs,
    step_kwargs: step_kwargs
  )

  { operation: operation, context: context, args: args, kwargs: kwargs }
end

#resultsArray<Hash>

Returns the result hashes from all executed steps.

Returns:

  • (Array<Hash>)

    Array of step metadata hashes including input/output.



218
219
220
# File 'lib/gaskit/flow.rb', line 218

def results
  @steps.map { |entry| entry[:result] }
end

#rewindvoid

This method returns an undefined value.

Rewinds the flow to its initial state.



208
209
210
211
212
213
# File 'lib/gaskit/flow.rb', line 208

def rewind
  @input = @initial_input.dup
  @result = nil
  @steps.clear
  @step_index = 0
end

#step(operation, context: {}, **kwargs) ⇒ Gaskit::OperationResult

Runs a step inline from within a block-based flow definition.

Parameters:

  • operation (Class<Gaskit::Operation>)

    The operation to execute.

  • context (Hash) (defaults to: {})

    Additional context for the step.

  • kwargs (Hash)

    Keyword arguments for the step.

Returns:



201
202
203
# File 'lib/gaskit/flow.rb', line 201

def step(operation, context: {}, **kwargs)
  process_step(operation, context, kwargs)
end