Class: Pindo::TaskSystem::TaskManager

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pindo/module/task/task_manager.rb

Overview

TaskManager - 任务管理器

职责:

  • 提供公共 API

  • 组合各个模块(队列、依赖检查、执行器、报告、资源锁)

  • 创建执行策略

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTaskManager

Returns a new instance of TaskManager.



24
25
26
27
28
29
30
# File 'lib/pindo/module/task/task_manager.rb', line 24

def initialize
  @queue = TaskQueue.new                               # 队列管理
  @resource_lock_manager = ResourceLockManager.new     # 资源锁管理
  @dependency_checker = DependencyChecker.new(@queue, @resource_lock_manager)  # 依赖检查
  @reporter = TaskReporter.new(@queue)                 # 报告输出
  @output_manager = nil                                 # 输出管理器
end

Instance Attribute Details

#dependency_checkerObject (readonly)

Returns the value of attribute dependency_checker.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def dependency_checker
  @dependency_checker
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def queue
  @queue
end

#reporterObject (readonly)

Returns the value of attribute reporter.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def reporter
  @reporter
end

#resource_lock_managerObject (readonly)

Returns the value of attribute resource_lock_manager.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def resource_lock_manager
  @resource_lock_manager
end

Instance Method Details

#add_task(task, options = {}) ⇒ String

添加任务

Parameters:

  • task (PindoTask)

    任务对象

  • options (Hash) (defaults to: {})

    选项

Returns:

  • (String)

    任务 ID

Raises:

  • (ArgumentError)


63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pindo/module/task/task_manager.rb', line 63

def add_task(task, options = {})
  raise ArgumentError, "Task must be a PindoTask" unless task.is_a?(PindoTask)

  unless task.validate
    raise ArgumentError, "Task validation failed: #{task.name}"
  end

  if options[:wait_for]
    task.dependencies.concat(Array(options[:wait_for]))
  end

  @queue.add(task, sort_by_priority: true)
end

#add_tasks(tasks) ⇒ Array<String>

批量添加任务

Parameters:

Returns:

  • (Array<String>)

    任务 ID 数组



80
81
82
# File 'lib/pindo/module/task/task_manager.rb', line 80

def add_tasks(tasks)
  tasks.map { |task| add_task(task) }
end

#cancel_task(task_id) ⇒ Object

取消任务

Parameters:

  • task_id (String)

    任务 ID



149
150
151
152
# File 'lib/pindo/module/task/task_manager.rb', line 149

def cancel_task(task_id)
  task = find_task(task_id)
  task&.cancel
end

#clear_allObject

清空所有队列



135
136
137
# File 'lib/pindo/module/task/task_manager.rb', line 135

def clear_all
  @queue.clear_all
end

#disable_output_managementObject

禁用输出管理系统



54
55
56
57
# File 'lib/pindo/module/task/task_manager.rb', line 54

def disable_output_management
  @output_manager = nil
  @executor.output_manager = nil if @executor
end

#enable_output_management(options = {}) ⇒ Object

启用输出管理系统

Parameters:

  • options (Hash) (defaults to: {})

    配置选项



42
43
44
45
46
47
48
49
50
51
# File 'lib/pindo/module/task/task_manager.rb', line 42

def enable_output_management(options = {})
  @output_manager = MultiLineOutputManager.new(
    log_dir: options[:log_dir] || './pindo_logs',
    max_lines_per_task: options[:max_lines_per_task] || 5,
    max_recent_completed: options[:max_recent_completed] || 3,
    auto_adjust: options.fetch(:auto_adjust, true)
  )
  # 更新执行器的输出管理器(如果已初始化)
  @executor.output_manager = @output_manager if @executor
end

#execution_reportHash

获取执行报告

Returns:

  • (Hash)

    执行报告



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/pindo/module/task/task_manager.rb', line 112

def execution_report
  pending = @queue.pending_snapshot
  completed = @queue.completed_snapshot

  {
    pending: pending.count,
    completed: completed.count,
    success: completed.count { |t| t.status == TaskStatus::SUCCESS },
    failed: completed.count { |t| t.status == TaskStatus::FAILED },
    tasks: (pending + completed).map do |task|
      {
        id: task.id,
        name: task.name,
        type: task.type,
        status: task.status,
        error: task.error&.message,
        execution_time: task.execution_time
      }
    end
  }
end

#executorTaskExecutor

获取执行器(延迟初始化)

Returns:



36
37
38
# File 'lib/pindo/module/task/task_manager.rb', line 36

def executor
  @executor ||= TaskExecutor.new(@queue, @reporter, @resource_lock_manager, @output_manager)
end

#find_task(task_id) ⇒ PindoTask?

查找任务

Parameters:

  • task_id (String)

    任务 ID

Returns:



157
158
159
# File 'lib/pindo/module/task/task_manager.rb', line 157

def find_task(task_id)
  @queue.find(task_id)
end

#start(options = {}) ⇒ Object

开始执行任务

Parameters:

  • options (Hash) (defaults to: {})

    执行选项

Options Hash (options):

  • :mode (Symbol)

    执行模式 (:serial, :concurrent)

  • :concurrent (Boolean)

    快捷参数

  • :max_workers (Integer)

    最大工作线程数



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/pindo/module/task/task_manager.rb', line 89

def start(options = {})
  mode = parse_execution_mode(options)
  strategy = ExecutionStrategy.create(mode, options)

  # 如果配置了输出管理器,注册所有任务
  if @output_manager
    @queue.pending_snapshot.each do |task|
      @output_manager.register_task(task)
    end
  end

  # 输出任务执行计划
  @reporter.print_execution_plan(strategy)

  # 执行策略
  strategy.execute(self)

  # 输出执行摘要
  @reporter.print_execution_summary
end

#task_status(task_id) ⇒ Symbol?

获取任务状态

Parameters:

  • task_id (String)

    任务 ID

Returns:

  • (Symbol, nil)

    任务状态



142
143
144
145
# File 'lib/pindo/module/task/task_manager.rb', line 142

def task_status(task_id)
  task = find_task(task_id)
  task&.status
end