Class: Pindo::TaskSystem::TaskManager
- Inherits:
-
Object
- Object
- Pindo::TaskSystem::TaskManager
- Includes:
- Singleton
- Defined in:
- lib/pindo/module/task/task_manager.rb
Overview
TaskManager - 任务管理器
职责:
-
提供公共 API
-
组合各个模块(队列、依赖检查、执行器、报告、资源锁)
-
创建执行策略
Instance Attribute Summary collapse
-
#dependency_checker ⇒ Object
readonly
Returns the value of attribute dependency_checker.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#reporter ⇒ Object
readonly
Returns the value of attribute reporter.
-
#resource_lock_manager ⇒ Object
readonly
Returns the value of attribute resource_lock_manager.
Instance Method Summary collapse
-
#add_task(task, options = {}) ⇒ String
添加任务.
-
#add_tasks(tasks) ⇒ Array<String>
批量添加任务.
-
#cancel_task(task_id) ⇒ Object
取消任务.
-
#clear_all ⇒ Object
清空所有队列.
-
#disable_output_management ⇒ Object
禁用输出管理系统.
-
#enable_output_management(options = {}) ⇒ Object
启用输出管理系统.
-
#execution_report ⇒ Hash
获取执行报告.
-
#executor ⇒ TaskExecutor
获取执行器(延迟初始化).
-
#find_task(task_id) ⇒ PindoTask?
查找任务.
-
#initialize ⇒ TaskManager
constructor
A new instance of TaskManager.
-
#start(options = {}) ⇒ Object
开始执行任务.
-
#task_status(task_id) ⇒ Symbol?
获取任务状态.
Constructor Details
#initialize ⇒ TaskManager
Returns a new instance of TaskManager.
24 25 26 27 28 29 30 |
# File 'lib/pindo/module/task/task_manager.rb', line 24 def initialize @queue = TaskQueue.new # 队列管理 @resource_lock_manager = ResourceLockManager.new # 资源锁管理 @dependency_checker = DependencyChecker.new(@queue, @resource_lock_manager) # 依赖检查 @reporter = TaskReporter.new(@queue) # 报告输出 @output_manager = nil # 输出管理器 end |
Instance Attribute Details
#dependency_checker ⇒ Object (readonly)
Returns the value of attribute dependency_checker.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def dependency_checker @dependency_checker end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def queue @queue end |
#reporter ⇒ Object (readonly)
Returns the value of attribute reporter.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def reporter @reporter end |
#resource_lock_manager ⇒ Object (readonly)
Returns the value of attribute resource_lock_manager.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def resource_lock_manager @resource_lock_manager end |
Instance Method Details
#add_task(task, options = {}) ⇒ String
添加任务
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pindo/module/task/task_manager.rb', line 63 def add_task(task, = {}) raise ArgumentError, "Task must be a PindoTask" unless task.is_a?(PindoTask) unless task.validate raise ArgumentError, "Task validation failed: #{task.name}" end if [:wait_for] task.dependencies.concat(Array([:wait_for])) end @queue.add(task, sort_by_priority: true) end |
#add_tasks(tasks) ⇒ Array<String>
批量添加任务
80 81 82 |
# File 'lib/pindo/module/task/task_manager.rb', line 80 def add_tasks(tasks) tasks.map { |task| add_task(task) } end |
#cancel_task(task_id) ⇒ Object
取消任务
149 150 151 152 |
# File 'lib/pindo/module/task/task_manager.rb', line 149 def cancel_task(task_id) task = find_task(task_id) task&.cancel end |
#clear_all ⇒ Object
清空所有队列
135 136 137 |
# File 'lib/pindo/module/task/task_manager.rb', line 135 def clear_all @queue.clear_all end |
#disable_output_management ⇒ Object
禁用输出管理系统
54 55 56 57 |
# File 'lib/pindo/module/task/task_manager.rb', line 54 def disable_output_management @output_manager = nil @executor.output_manager = nil if @executor end |
#enable_output_management(options = {}) ⇒ Object
启用输出管理系统
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pindo/module/task/task_manager.rb', line 42 def enable_output_management( = {}) @output_manager = MultiLineOutputManager.new( log_dir: [:log_dir] || './pindo_logs', max_lines_per_task: [:max_lines_per_task] || 5, max_recent_completed: [:max_recent_completed] || 3, auto_adjust: .fetch(:auto_adjust, true) ) # 更新执行器的输出管理器(如果已初始化) @executor.output_manager = @output_manager if @executor end |
#execution_report ⇒ Hash
获取执行报告
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/pindo/module/task/task_manager.rb', line 112 def execution_report pending = @queue.pending_snapshot completed = @queue.completed_snapshot { pending: pending.count, completed: completed.count, success: completed.count { |t| t.status == TaskStatus::SUCCESS }, failed: completed.count { |t| t.status == TaskStatus::FAILED }, tasks: (pending + completed).map do |task| { id: task.id, name: task.name, type: task.type, status: task.status, error: task.error&., execution_time: task.execution_time } end } end |
#executor ⇒ TaskExecutor
获取执行器(延迟初始化)
36 37 38 |
# File 'lib/pindo/module/task/task_manager.rb', line 36 def executor @executor ||= TaskExecutor.new(@queue, @reporter, @resource_lock_manager, @output_manager) end |
#find_task(task_id) ⇒ PindoTask?
查找任务
157 158 159 |
# File 'lib/pindo/module/task/task_manager.rb', line 157 def find_task(task_id) @queue.find(task_id) end |
#start(options = {}) ⇒ Object
开始执行任务
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/pindo/module/task/task_manager.rb', line 89 def start( = {}) mode = parse_execution_mode() strategy = ExecutionStrategy.create(mode, ) # 如果配置了输出管理器,注册所有任务 if @output_manager @queue.pending_snapshot.each do |task| @output_manager.register_task(task) end end # 输出任务执行计划 @reporter.print_execution_plan(strategy) # 执行策略 strategy.execute(self) # 输出执行摘要 @reporter.print_execution_summary end |
#task_status(task_id) ⇒ Symbol?
获取任务状态
142 143 144 145 |
# File 'lib/pindo/module/task/task_manager.rb', line 142 def task_status(task_id) task = find_task(task_id) task&.status end |