Class: Agentic::PlanOrchestrator
- Inherits:
-
Object
- Object
- Agentic::PlanOrchestrator
- Defined in:
- lib/agentic/plan_orchestrator.rb
Overview
Orchestrates the execution of tasks in a plan, handling dependencies and concurrency
Instance Attribute Summary collapse
-
#execution_state ⇒ Hash
readonly
Current state of all tasks in the plan.
-
#lifecycle_hooks ⇒ Object
readonly
Returns the value of attribute lifecycle_hooks.
-
#plan_id ⇒ String
readonly
Unique identifier for the plan.
-
#results ⇒ Hash
readonly
Results of task execution.
-
#retry_policy ⇒ Object
readonly
Returns the value of attribute retry_policy.
-
#tasks ⇒ Hash
readonly
Map of task ids to Task objects.
Instance Method Summary collapse
-
#add_task(task, dependencies = []) ⇒ void
Adds a task to the plan with optional dependencies.
-
#all_dependencies_met?(task_id) ⇒ Boolean
Checks if all dependencies for a task are met.
-
#apply_retry_backoff(task:) ⇒ void
Applies a delay based on the backoff strategy before retrying.
-
#cancel_plan ⇒ void
Cancels execution of the entire plan.
-
#cancel_task(task_id) ⇒ Boolean
Cancels execution of a specific task.
-
#execute_plan(agent_provider) ⇒ PlanExecutionResult
Executes the plan, respecting task dependencies and concurrency limits.
-
#find_eligible_tasks ⇒ Array<String>
Finds tasks that are eligible for execution (have no dependencies).
-
#initialize(plan_id: SecureRandom.uuid, concurrency_limit: 10, retry_policy: {}, lifecycle_hooks: {}) ⇒ PlanOrchestrator
constructor
Initializes a new plan orchestrator.
-
#overall_status ⇒ Symbol
Determines the overall status of the plan.
-
#requires_intervention?(failure:) ⇒ Boolean
Determines if a failure requires human intervention.
-
#retry?(task:, failure:) ⇒ Boolean
Determines if a task failure is retryable based on retry policy.
Constructor Details
#initialize(plan_id: SecureRandom.uuid, concurrency_limit: 10, retry_policy: {}, lifecycle_hooks: {}) ⇒ PlanOrchestrator
Initializes a new plan orchestrator
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/agentic/plan_orchestrator.rb', line 27 def initialize(plan_id: SecureRandom.uuid, concurrency_limit: 10, retry_policy: {}, lifecycle_hooks: {}) @plan_id = plan_id @tasks = {} @dependencies = {} @results = {} @execution_state = { pending: Set.new, in_progress: Set.new, completed: Set.new, failed: Set.new, canceled: Set.new } @concurrency_limit = concurrency_limit @async_tasks = {} # Configure retry policy with defaults @retry_policy = { max_retries: 3, retryable_errors: ["TimeoutError"], backoff_strategy: :constant }.merge(retry_policy) # Configure lifecycle hooks with callable defaults (no-ops) @lifecycle_hooks = { before_agent_build: ->(task_id:, task:) {}, # Called before an agent is built after_agent_build: ->(task_id:, task:, agent:, build_duration:) {}, # Called after an agent is built before_task_execution: ->(task_id:, task:) {}, # Called before a task is executed after_task_success: ->(task_id:, task:, result:, duration:) {}, # Called after a task succeeds after_task_failure: ->(task_id:, task:, failure:, duration:) {}, # Called after a task fails plan_completed: ->(plan_id:, status:, execution_time:, tasks:, results:) {} # Called when plan completes }.merge(lifecycle_hooks) end |
Instance Attribute Details
#execution_state ⇒ Hash (readonly)
Current state of all tasks in the plan
18 19 20 |
# File 'lib/agentic/plan_orchestrator.rb', line 18 def execution_state @execution_state end |
#lifecycle_hooks ⇒ Object (readonly)
Returns the value of attribute lifecycle_hooks.
19 20 21 |
# File 'lib/agentic/plan_orchestrator.rb', line 19 def lifecycle_hooks @lifecycle_hooks end |
#plan_id ⇒ String (readonly)
Unique identifier for the plan
18 19 20 |
# File 'lib/agentic/plan_orchestrator.rb', line 18 def plan_id @plan_id end |
#results ⇒ Hash (readonly)
Results of task execution
18 19 20 |
# File 'lib/agentic/plan_orchestrator.rb', line 18 def results @results end |
#retry_policy ⇒ Object (readonly)
Returns the value of attribute retry_policy.
19 20 21 |
# File 'lib/agentic/plan_orchestrator.rb', line 19 def retry_policy @retry_policy end |
#tasks ⇒ Hash (readonly)
Map of task ids to Task objects
18 19 20 |
# File 'lib/agentic/plan_orchestrator.rb', line 18 def tasks @tasks end |
Instance Method Details
#add_task(task, dependencies = []) ⇒ void
This method returns an undefined value.
Adds a task to the plan with optional dependencies
64 65 66 67 68 69 |
# File 'lib/agentic/plan_orchestrator.rb', line 64 def add_task(task, dependencies = []) task_id = task.id @tasks[task_id] = task @dependencies[task_id] = Array(dependencies) @execution_state[:pending].add(task_id) end |
#all_dependencies_met?(task_id) ⇒ Boolean
Checks if all dependencies for a task are met
221 222 223 224 225 226 |
# File 'lib/agentic/plan_orchestrator.rb', line 221 def all_dependencies_met?(task_id) deps = @dependencies[task_id] || [] deps.all? do |dep_id| @execution_state[:completed].include?(dep_id) end end |
#apply_retry_backoff(task:) ⇒ void
This method returns an undefined value.
Applies a delay based on the backoff strategy before retrying
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/agentic/plan_orchestrator.rb', line 184 def apply_retry_backoff(task:) return if @retry_policy[:backoff_strategy] == :none delay = case @retry_policy[:backoff_strategy] when :constant # Constant delay (default 1 second) @retry_policy[:backoff_constant] || 1 when :linear # Linear backoff (retry_count * base_delay) base_delay = @retry_policy[:backoff_base] || 1 task.retry_count * base_delay when :exponential # Exponential backoff (base_delay * 2^retry_count) base_delay = @retry_policy[:backoff_base] || 1 base_delay * (2**(task.retry_count - 1)) else 0 end # Apply jitter if configured if @retry_policy[:backoff_jitter] jitter_factor = 0.25 # Default 25% jitter jitter = rand(-delay * jitter_factor..delay * jitter_factor) delay += jitter end # Sleep if there's a delay to apply if delay > 0 Async do Async::Task.current.sleep(delay) if delay > 0 end end end |
#cancel_plan ⇒ void
This method returns an undefined value.
Cancels execution of the entire plan
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/agentic/plan_orchestrator.rb', line 146 def cancel_plan # Stop the reactor to cancel all async tasks @reactor&.stop # Move all pending and in_progress tasks to canceled state @execution_state[:pending].each do |task_id| transition_task_state(task_id, from: :pending, to: :canceled) end @execution_state[:in_progress].each do |task_id| transition_task_state(task_id, from: :in_progress, to: :canceled) end end |
#cancel_task(task_id) ⇒ Boolean
Cancels execution of a specific task
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/agentic/plan_orchestrator.rb', line 123 def cancel_task(task_id) # Can only cancel pending or in_progress tasks return false unless @execution_state[:pending].include?(task_id) || @execution_state[:in_progress].include?(task_id) # If the task is pending, simply move it to canceled state if @execution_state[:pending].include?(task_id) transition_task_state(task_id, from: :pending, to: :canceled) return true end # If the task is in progress, cancel its Async task if @execution_state[:in_progress].include?(task_id) && @async_tasks[task_id] @async_tasks[task_id].stop transition_task_state(task_id, from: :in_progress, to: :canceled) return true end false end |
#execute_plan(agent_provider) ⇒ PlanExecutionResult
Executes the plan, respecting task dependencies and concurrency limits
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/agentic/plan_orchestrator.rb', line 74 def execute_plan(agent_provider) @reactor = Async do |reactor| = Async::Barrier.new @semaphore = Async::Semaphore.new(@concurrency_limit, parent: ) # Track execution start time @execution_start_time = Time.now # Start with tasks that have no dependencies eligible_tasks = find_eligible_tasks # Initial execution of eligible tasks eligible_tasks.each do |task_id| schedule_task(task_id, agent_provider, @semaphore, ) end # Wait for all tasks to complete .wait # Track execution completion time @execution_end_time = Time.now # Call plan completion hook @lifecycle_hooks[:plan_completed].call( plan_id: @plan_id, status: overall_status, execution_time: @execution_end_time - @execution_start_time, tasks: @tasks.transform_values(&:to_h), results: @results ) ensure &.stop # Ensure execution_end_time is set even if an exception occurred @execution_end_time ||= Time.now end # Create and return a PlanExecutionResult PlanExecutionResult.new( plan_id: @plan_id, status: overall_status, execution_time: @execution_end_time - @execution_start_time, tasks: @tasks.transform_values(&:to_h), results: @results ) end |
#find_eligible_tasks ⇒ Array<String>
Finds tasks that are eligible for execution (have no dependencies)
230 231 232 233 234 |
# File 'lib/agentic/plan_orchestrator.rb', line 230 def find_eligible_tasks @dependencies.select do |task_id, deps| deps.empty? && @execution_state[:pending].include?(task_id) end.keys end |
#overall_status ⇒ Symbol
Determines the overall status of the plan
238 239 240 241 242 243 244 245 246 |
# File 'lib/agentic/plan_orchestrator.rb', line 238 def overall_status if @execution_state[:failed].any? :partial_failure elsif @execution_state[:pending].empty? && @execution_state[:in_progress].empty? :completed else :in_progress end end |
#requires_intervention?(failure:) ⇒ Boolean
Determines if a failure requires human intervention
176 177 178 179 |
# File 'lib/agentic/plan_orchestrator.rb', line 176 def requires_intervention?(failure:) # For now, we only identify a few error types that need human help %w[AuthenticationError PermissionDeniedError ConfigurationError].include?(failure.type) end |
#retry?(task:, failure:) ⇒ Boolean
Determines if a task failure is retryable based on retry policy
164 165 166 167 168 169 170 171 |
# File 'lib/agentic/plan_orchestrator.rb', line 164 def retry?(task:, failure:) # Check if we've reached max retries task.retry_count ||= 0 return false if task.retry_count >= @retry_policy[:max_retries] # Check if error type is in retryable_errors list @retry_policy[:retryable_errors].include?(failure.type) end |