Class: Pindo::TaskSystem::ConcurrentExecutionStrategy

Inherits:
ExecutionStrategy show all
Defined in:
lib/pindo/module/task/core/concurrent_execution_strategy.rb

Overview

ConcurrentExecutionStrategy - 并发执行策略

职责:

  • 在多线程中并发执行独立任务

  • 控制最大工作线程数

  • 处理线程安全和任务同步

Constant Summary collapse

STARVATION_THRESHOLD =

饥饿阈值:任务被跳过多少次后进入饥饿模式

5

Instance Method Summary collapse

Methods inherited from ExecutionStrategy

create, detect_optimal_workers

Constructor Details

#initialize(max_workers: 4) ⇒ ConcurrentExecutionStrategy

初始化



16
17
18
19
20
21
22
23
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 16

def initialize(max_workers: 4)
  @max_workers = max_workers
  @mutex = Mutex.new
  @running_tasks = []  # 正在运行的任务
  @threads = []        # 工作线程
  @threads = []        # 工作线程
  @completion_queue = Queue.new  # 任务完成通知队列
end

Instance Method Details

#execute(task_manager) ⇒ Object

执行任务



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 30

def execute(task_manager)
  queue = task_manager.queue
  checker = task_manager.dependency_checker
  executor = task_manager.executor

  loop do
    # 启动新任务(直到达到 max_workers 上限)
    start_pending_tasks(queue, checker, executor, task_manager)

    # 退出条件:无正在运行的任务 + 待执行队列为空
    if no_running_tasks? && queue.pending_empty?
      Funlog.info("[并发策略] 退出循环:无运行任务且队列为空")
      break
    end

    # 如果所有任务都因依赖问题阻塞,且没有正在运行的任务
    if no_running_tasks? && checker.all_pending_blocked?
      pending_count = queue.pending_snapshot.size
      Funlog.warning("[并发策略] 所有剩余任务都因依赖问题无法执行 (待执行: #{pending_count})")
      # 打印所有待执行任务的状态
      queue.pending_snapshot.each do |task|
        dep_status = checker.check_dependencies(task)
        Funlog.warning("  - 任务: #{task.name}, 依赖检查结果: #{dep_status}")
      end
      break
    end

    # 等待至少一个任务完成
    wait_for_task_completion if has_running_tasks?
  end

  # 等待所有线程完成
  @threads.each(&:join)
end

#nameString

策略名称



67
68
69
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 67

def name
  "并发执行 (#{@max_workers} 线程)"
end