Class: LangGraphRB::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/langgraph_rb/runner.rb

Defined Under Namespace

Classes: ExecutionFrame

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(graph, store:, thread_id:, observers: []) ⇒ Runner

Returns a new instance of Runner.



9
10
11
12
13
14
15
16
17
# File 'lib/langgraph_rb/runner.rb', line 9

def initialize(graph, store:, thread_id:, observers: [])
  @graph = graph
  @store = store
  @thread_id = thread_id
  @step_number = 0
  @execution_queue = Queue.new
  @interrupt_handler = nil
  @observers = Array(observers)
end

Instance Attribute Details

#graphObject (readonly)

Returns the value of attribute graph.



7
8
9
# File 'lib/langgraph_rb/runner.rb', line 7

def graph
  @graph
end

#storeObject (readonly)

Returns the value of attribute store.



7
8
9
# File 'lib/langgraph_rb/runner.rb', line 7

def store
  @store
end

#thread_idObject (readonly)

Returns the value of attribute thread_id.



7
8
9
# File 'lib/langgraph_rb/runner.rb', line 7

def thread_id
  @thread_id
end

Instance Method Details

#invoke(initial_state, context: nil) ⇒ Object

Synchronous execution



20
21
22
23
24
25
26
27
28
# File 'lib/langgraph_rb/runner.rb', line 20

def invoke(initial_state, context: nil)
  result = nil
  
  stream(initial_state, context: context) do |step_result|
    result = step_result
  end
  
  result[:state]
end

#on_interrupt(&handler) ⇒ Object

Set interrupt handler for human-in-the-loop



158
159
160
# File 'lib/langgraph_rb/runner.rb', line 158

def on_interrupt(&handler)
  @interrupt_handler = handler
end

#resume(additional_input = {}, context: nil) ⇒ Object

Resume from checkpoint

Raises:



146
147
148
149
150
151
152
153
154
155
# File 'lib/langgraph_rb/runner.rb', line 146

def resume(additional_input = {}, context: nil)
  checkpoint = @store.load(@thread_id)
  raise GraphError, "No checkpoint found for thread #{@thread_id}" unless checkpoint
  
  @step_number = checkpoint[:step_number]
  resumed_state = checkpoint[:state].merge_delta(additional_input)
  
  # Resume execution from where we left off
  stream(resumed_state, context: context)
end

#stream(initial_state, context: nil, &block) ⇒ Object

Streaming execution with optional block for receiving intermediate results



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/langgraph_rb/runner.rb', line 31

def stream(initial_state, context: nil, &block)
  notify_graph_start(initial_state, context)
  
  @step_number = 0
  current_state = initial_state
  
  # Initialize execution queue with START node
  active_executions = [
    ExecutionFrame.new(Graph::START, current_state, 0)
  ]

  loop do
    break if active_executions.empty?
    
    # Execute current super-step (all nodes at current level in parallel)
    step_results = execute_super_step(active_executions, context)
    break if step_results.empty?
    
    @step_number += 1
    
    # Process results and determine next nodes
    next_active = []
    final_state = nil
    
    step_results.each do |result|
      case result[:type]
      when :completed
        # Node completed normally
        if result[:next_destination]
          # Command specified explicit destination
          dest_name = result[:next_destination]
          dest_state = result[:state]
          
          if dest_name == Graph::FINISH
            final_state = dest_state
          else
            next_active << ExecutionFrame.new(dest_name, dest_state, @step_number, from_node: result[:node_name])
          end
        else
          # Use normal edge routing
          next_destinations = determine_next_destinations(
            result[:node_name], 
            result[:state], 
            context
          )
          
          next_destinations.each do |dest_name, dest_state|
            if dest_name == Graph::FINISH
              final_state = dest_state
            else
              next_active << ExecutionFrame.new(dest_name, dest_state, @step_number, from_node: result[:node_name])
            end
          end
        end
        
      when :send
        # Handle Send commands (map-reduce)
        result[:sends].each do |send_cmd|
          payload_state = result[:state].merge_delta(send_cmd.payload)
          next_active << ExecutionFrame.new(send_cmd.to, payload_state, @step_number, from_node: result[:node_name])
        end
        
      when :interrupt
        # Handle human-in-the-loop interrupts
        if @interrupt_handler
          user_input = @interrupt_handler.call(result[:interrupt])
          # Continue with user input merged into state
          updated_state = result[:state].merge_delta(user_input || {})
          next_active << ExecutionFrame.new(result[:node_name], updated_state, @step_number, from_node: result[:node_name])
        else
          # No interrupt handler, treat as completion
          final_state = result[:state]
        end
        
      when :error
        raise result[:error]
      end
    end
    
    # Save checkpoint
    checkpoint_state = final_state || (next_active.first&.state) || current_state
    save_checkpoint(checkpoint_state, @step_number)
    
    # Yield intermediate result if block given
    if block
      yield({
        step: @step_number,
        state: checkpoint_state,
        active_nodes: next_active.map(&:node_name),
        completed: next_active.empty?
      })
    end
    
    # Update for next iteration
    current_state = checkpoint_state
    active_executions = next_active
    
    # Break if we reached END
    break if final_state
  end
  
  result = {
    state: current_state,
    step_number: @step_number,
    thread_id: @thread_id
  }
  
  notify_graph_end(current_state)
  result
rescue => error
  notify_graph_end(current_state || initial_state)
  raise
end