Class: Pindo::TaskSystem::ConcurrentExecutionStrategy

Inherits:
ExecutionStrategy show all
Defined in:
lib/pindo/module/task/core/concurrent_execution_strategy.rb

Overview

ConcurrentExecutionStrategy - 并发执行策略

职责:

  • 在多线程中并发执行独立任务

  • 控制最大工作线程数

  • 处理线程安全和任务同步

Constant Summary collapse

STARVATION_THRESHOLD =

饥饿阈值:任务被跳过多少次后进入饥饿模式

5

Instance Method Summary collapse

Methods inherited from ExecutionStrategy

create, detect_optimal_workers

Constructor Details

#initialize(max_workers: 4) ⇒ ConcurrentExecutionStrategy

初始化

Parameters:

  • max_workers (Integer) (defaults to: 4)

    最大工作线程数



16
17
18
19
20
21
22
23
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 16

def initialize(max_workers: 4)
  @max_workers = max_workers
  @mutex = Mutex.new
  @running_tasks = []  # 正在运行的任务
  @threads = []        # 工作线程
  @threads = []        # 工作线程
  @completion_queue = Queue.new  # 任务完成通知队列
end

Instance Method Details

#execute(task_manager) ⇒ Object

执行任务

Parameters:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 30

def execute(task_manager)
  queue = task_manager.queue
  checker = task_manager.dependency_checker
  executor = task_manager.executor

  loop do
    # 启动新任务(直到达到 max_workers 上限)
    start_pending_tasks(queue, checker, executor, task_manager)

    # 退出条件:无正在运行的任务 + 待执行队列为空
    if no_running_tasks? && queue.pending_empty?
      Funlog.info("[并发策略] 退出循环:无运行任务且队列为空")
      break
    end

    # 如果所有任务都因依赖问题阻塞,且没有正在运行的任务
    if no_running_tasks? && checker.all_pending_blocked?
      pending_count = queue.pending_snapshot.size
      Funlog.warning("[并发策略] 所有剩余任务都因依赖问题无法执行 (待执行: #{pending_count})")
      # 打印所有待执行任务的状态
      queue.pending_snapshot.each do |task|
        dep_status = checker.check_dependencies(task)
        Funlog.warning("  - 任务: #{task.name}, 依赖检查结果: #{dep_status}")
      end
      break
    end

    # 等待至少一个任务完成
    wait_for_task_completion if has_running_tasks?
  end

  # 等待所有线程完成
  @threads.each(&:join)
end

#nameString

策略名称

Returns:

  • (String)


67
68
69
# File 'lib/pindo/module/task/core/concurrent_execution_strategy.rb', line 67

def name
  "并发执行 (#{@max_workers} 线程)"
end