Class: Pindo::TaskSystem::ConcurrentExecutionStrategy
- Inherits:
-
ExecutionStrategy
- Object
- ExecutionStrategy
- Pindo::TaskSystem::ConcurrentExecutionStrategy
- Defined in:
- lib/pindo/module/task/core/concurrent_execution_strategy.rb
Overview
ConcurrentExecutionStrategy - 并发执行策略
职责:
-
在多线程中并发执行独立任务
-
控制最大工作线程数
-
处理线程安全和任务同步
Constant Summary collapse
- STARVATION_THRESHOLD =
饥饿阈值:任务被跳过多少次后进入饥饿模式
5
Instance Method Summary collapse
-
#execute(task_manager) ⇒ Object
执行任务.
-
#initialize(max_workers: 4) ⇒ ConcurrentExecutionStrategy
constructor
初始化.
-
#name ⇒ String
策略名称.
Methods inherited from ExecutionStrategy
create, detect_optimal_workers
Constructor Details
#initialize(max_workers: 4) ⇒ ConcurrentExecutionStrategy
初始化
16 17 18 19 20 21 22 23 |
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 16 def initialize(max_workers: 4) @max_workers = max_workers @mutex = Mutex.new @running_tasks = [] # 正在运行的任务 @threads = [] # 工作线程 @threads = [] # 工作线程 @completion_queue = Queue.new # 任务完成通知队列 end |
Instance Method Details
#execute(task_manager) ⇒ Object
执行任务
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 30 def execute(task_manager) queue = task_manager.queue checker = task_manager.dependency_checker executor = task_manager.executor loop do # 启动新任务(直到达到 max_workers 上限) start_pending_tasks(queue, checker, executor, task_manager) # 退出条件:无正在运行的任务 + 待执行队列为空 if no_running_tasks? && queue.pending_empty? Funlog.info("[并发策略] 退出循环:无运行任务且队列为空") break end # 如果所有任务都因依赖问题阻塞,且没有正在运行的任务 if no_running_tasks? && checker.all_pending_blocked? pending_count = queue.pending_snapshot.size Funlog.warning("[并发策略] 所有剩余任务都因依赖问题无法执行 (待执行: #{pending_count})") # 打印所有待执行任务的状态 queue.pending_snapshot.each do |task| dep_status = checker.check_dependencies(task) Funlog.warning(" - 任务: #{task.name}, 依赖检查结果: #{dep_status}") end break end # 等待至少一个任务完成 wait_for_task_completion if has_running_tasks? end # 等待所有线程完成 @threads.each(&:join) end |
#name ⇒ String
策略名称
67 68 69 |
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 67 def name "并发执行 (#{@max_workers} 线程)" end |