Class: Agentic::PlanOrchestrator

Inherits:
Object
  • Object
show all
Defined in:
lib/agentic/plan_orchestrator.rb

Overview

Orchestrates the execution of tasks in a plan, handling dependencies and concurrency

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plan_id: SecureRandom.uuid, concurrency_limit: 10, retry_policy: {}, lifecycle_hooks: {}) ⇒ PlanOrchestrator

Initializes a new plan orchestrator

Parameters:

  • plan_id (String) (defaults to: SecureRandom.uuid)

    Optional plan id, will be generated if not provided

  • concurrency_limit (Integer) (defaults to: 10)

    Maximum number of tasks to execute concurrently

  • retry_policy (Hash) (defaults to: {})

    Configuration for retry behavior

  • lifecycle_hooks (Hash) (defaults to: {})

    Configuration for execution lifecycle hooks



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/agentic/plan_orchestrator.rb', line 27

def initialize(plan_id: SecureRandom.uuid, concurrency_limit: 10, retry_policy: {}, lifecycle_hooks: {})
  @plan_id = plan_id
  @tasks = {}
  @dependencies = {}
  @results = {}
  @execution_state = {
    pending: Set.new,
    in_progress: Set.new,
    completed: Set.new,
    failed: Set.new,
    canceled: Set.new
  }
  @concurrency_limit = concurrency_limit
  @async_tasks = {}

  # Configure retry policy with defaults
  @retry_policy = {
    max_retries: 3,
    retryable_errors: ["TimeoutError"],
    backoff_strategy: :constant
  }.merge(retry_policy)

  # Configure lifecycle hooks with callable defaults (no-ops)
  @lifecycle_hooks = {
    before_agent_build: ->(task_id:, task:) {},          # Called before an agent is built
    after_agent_build: ->(task_id:, task:, agent:, build_duration:) {}, # Called after an agent is built
    before_task_execution: ->(task_id:, task:) {},        # Called before a task is executed
    after_task_success: ->(task_id:, task:, result:, duration:) {}, # Called after a task succeeds
    after_task_failure: ->(task_id:, task:, failure:, duration:) {}, # Called after a task fails
    plan_completed: ->(plan_id:, status:, execution_time:, tasks:, results:) {} # Called when plan completes
  }.merge(lifecycle_hooks)
end

Instance Attribute Details

#execution_stateHash (readonly)

Current state of all tasks in the plan

Returns:

  • (Hash)

    the current value of execution_state



18
19
20
# File 'lib/agentic/plan_orchestrator.rb', line 18

def execution_state
  @execution_state
end

#lifecycle_hooksObject (readonly)

Returns the value of attribute lifecycle_hooks.



19
20
21
# File 'lib/agentic/plan_orchestrator.rb', line 19

def lifecycle_hooks
  @lifecycle_hooks
end

#plan_idString (readonly)

Unique identifier for the plan

Returns:

  • (String)

    the current value of plan_id



18
19
20
# File 'lib/agentic/plan_orchestrator.rb', line 18

def plan_id
  @plan_id
end

#resultsHash (readonly)

Results of task execution

Returns:

  • (Hash)

    the current value of results



18
19
20
# File 'lib/agentic/plan_orchestrator.rb', line 18

def results
  @results
end

#retry_policyObject (readonly)

Returns the value of attribute retry_policy.



19
20
21
# File 'lib/agentic/plan_orchestrator.rb', line 19

def retry_policy
  @retry_policy
end

#tasksHash (readonly)

Map of task ids to Task objects

Returns:

  • (Hash)

    the current value of tasks



18
19
20
# File 'lib/agentic/plan_orchestrator.rb', line 18

def tasks
  @tasks
end

Instance Method Details

#add_task(task, dependencies = []) ⇒ void

This method returns an undefined value.

Adds a task to the plan with optional dependencies

Parameters:

  • task (Task)

    The task to add

  • dependencies (Array<String>) (defaults to: [])

    Array of task ids that this task depends on



64
65
66
67
68
69
# File 'lib/agentic/plan_orchestrator.rb', line 64

def add_task(task, dependencies = [])
  task_id = task.id
  @tasks[task_id] = task
  @dependencies[task_id] = Array(dependencies)
  @execution_state[:pending].add(task_id)
end

#all_dependencies_met?(task_id) ⇒ Boolean

Checks if all dependencies for a task are met

Parameters:

  • task_id (String)

    ID of the task to check

Returns:

  • (Boolean)

    True if all dependencies are met, false otherwise



221
222
223
224
225
226
# File 'lib/agentic/plan_orchestrator.rb', line 221

def all_dependencies_met?(task_id)
  deps = @dependencies[task_id] || []
  deps.all? do |dep_id|
    @execution_state[:completed].include?(dep_id)
  end
end

#apply_retry_backoff(task:) ⇒ void

This method returns an undefined value.

Applies a delay based on the backoff strategy before retrying

Parameters:

  • task (Task)

    The task being retried



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/agentic/plan_orchestrator.rb', line 184

def apply_retry_backoff(task:)
  return if @retry_policy[:backoff_strategy] == :none

  delay = case @retry_policy[:backoff_strategy]
  when :constant
    # Constant delay (default 1 second)
    @retry_policy[:backoff_constant] || 1
  when :linear
    # Linear backoff (retry_count * base_delay)
    base_delay = @retry_policy[:backoff_base] || 1
    task.retry_count * base_delay
  when :exponential
    # Exponential backoff (base_delay * 2^retry_count)
    base_delay = @retry_policy[:backoff_base] || 1
    base_delay * (2**(task.retry_count - 1))
  else
    0
  end

  # Apply jitter if configured
  if @retry_policy[:backoff_jitter]
    jitter_factor = 0.25 # Default 25% jitter
    jitter = rand(-delay * jitter_factor..delay * jitter_factor)
    delay += jitter
  end

  # Sleep if there's a delay to apply
  if delay > 0
    Async do
      Async::Task.current.sleep(delay) if delay > 0
    end
  end
end

#cancel_planvoid

This method returns an undefined value.

Cancels execution of the entire plan



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/agentic/plan_orchestrator.rb', line 146

def cancel_plan
  # Stop the reactor to cancel all async tasks
  @reactor&.stop

  # Move all pending and in_progress tasks to canceled state
  @execution_state[:pending].each do |task_id|
    transition_task_state(task_id, from: :pending, to: :canceled)
  end

  @execution_state[:in_progress].each do |task_id|
    transition_task_state(task_id, from: :in_progress, to: :canceled)
  end
end

#cancel_task(task_id) ⇒ Boolean

Cancels execution of a specific task

Parameters:

  • task_id (String)

    ID of the task to cancel

Returns:

  • (Boolean)

    True if the task was canceled, false otherwise



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/agentic/plan_orchestrator.rb', line 123

def cancel_task(task_id)
  # Can only cancel pending or in_progress tasks
  return false unless @execution_state[:pending].include?(task_id) ||
    @execution_state[:in_progress].include?(task_id)

  # If the task is pending, simply move it to canceled state
  if @execution_state[:pending].include?(task_id)
    transition_task_state(task_id, from: :pending, to: :canceled)
    return true
  end

  # If the task is in progress, cancel its Async task
  if @execution_state[:in_progress].include?(task_id) && @async_tasks[task_id]
    @async_tasks[task_id].stop
    transition_task_state(task_id, from: :in_progress, to: :canceled)
    return true
  end

  false
end

#execute_plan(agent_provider) ⇒ PlanExecutionResult

Executes the plan, respecting task dependencies and concurrency limits

Parameters:

  • agent_provider (Object)

    An object that provides agents for task execution

Returns:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/agentic/plan_orchestrator.rb', line 74

def execute_plan(agent_provider)
  @reactor = Async do |reactor|
    @barrier = Async::Barrier.new
    @semaphore = Async::Semaphore.new(@concurrency_limit, parent: @barrier)

    # Track execution start time
    @execution_start_time = Time.now

    # Start with tasks that have no dependencies
    eligible_tasks = find_eligible_tasks

    # Initial execution of eligible tasks
    eligible_tasks.each do |task_id|
      schedule_task(task_id, agent_provider, @semaphore, @barrier)
    end

    # Wait for all tasks to complete
    @barrier.wait

    # Track execution completion time
    @execution_end_time = Time.now

    # Call plan completion hook
    @lifecycle_hooks[:plan_completed].call(
      plan_id: @plan_id,
      status: overall_status,
      execution_time: @execution_end_time - @execution_start_time,
      tasks: @tasks.transform_values(&:to_h),
      results: @results
    )
  ensure
    @barrier&.stop
    # Ensure execution_end_time is set even if an exception occurred
    @execution_end_time ||= Time.now
  end

  # Create and return a PlanExecutionResult
  PlanExecutionResult.new(
    plan_id: @plan_id,
    status: overall_status,
    execution_time: @execution_end_time - @execution_start_time,
    tasks: @tasks.transform_values(&:to_h),
    results: @results
  )
end

#find_eligible_tasksArray<String>

Finds tasks that are eligible for execution (have no dependencies)

Returns:

  • (Array<String>)

    IDs of eligible tasks



230
231
232
233
234
# File 'lib/agentic/plan_orchestrator.rb', line 230

def find_eligible_tasks
  @dependencies.select do |task_id, deps|
    deps.empty? && @execution_state[:pending].include?(task_id)
  end.keys
end

#overall_statusSymbol

Determines the overall status of the plan

Returns:

  • (Symbol)

    The overall status (:completed, :in_progress, or :partial_failure)



238
239
240
241
242
243
244
245
246
# File 'lib/agentic/plan_orchestrator.rb', line 238

def overall_status
  if @execution_state[:failed].any?
    :partial_failure
  elsif @execution_state[:pending].empty? && @execution_state[:in_progress].empty?
    :completed
  else
    :in_progress
  end
end

#requires_intervention?(failure:) ⇒ Boolean

Determines if a failure requires human intervention

Parameters:

Returns:

  • (Boolean)

    True if human intervention is required



176
177
178
179
# File 'lib/agentic/plan_orchestrator.rb', line 176

def requires_intervention?(failure:)
  # For now, we only identify a few error types that need human help
  %w[AuthenticationError PermissionDeniedError ConfigurationError].include?(failure.type)
end

#retry?(task:, failure:) ⇒ Boolean

Determines if a task failure is retryable based on retry policy

Parameters:

  • task (Task)

    The failed task

  • failure (TaskFailure)

    The failure details

Returns:

  • (Boolean)

    True if the task failure is retryable



164
165
166
167
168
169
170
171
# File 'lib/agentic/plan_orchestrator.rb', line 164

def retry?(task:, failure:)
  # Check if we've reached max retries
  task.retry_count ||= 0
  return false if task.retry_count >= @retry_policy[:max_retries]

  # Check if error type is in retryable_errors list
  @retry_policy[:retryable_errors].include?(failure.type)
end