Class: Pindo::TaskSystem::TaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pindo/module/task/core/task_queue.rb

Overview

TaskQueue - 任务队列管理

职责:

  • 管理待执行队列、已完成队列和当前任务

  • 提供线程安全的队列操作

  • 提供原子操作 API

  • 提供高级查询功能(分组、统计)

Instance Method Summary collapse

Constructor Details

#initializeTaskQueue



13
14
15
16
17
18
19
# File 'lib/pindo/module/task/core/task_queue.rb', line 13

def initialize
  @pending = []      # 待执行队列
  @running = []      # 正在运行队列(用于并发执行)
  @completed = []    # 已完成队列
  @current = nil     # 当前正在执行的任务(用于串行执行)
  @mutex = Mutex.new # 线程安全锁
end

Instance Method Details

#add(task, sort_by_priority: true) ⇒ String

添加任务(线程安全)



27
28
29
30
31
32
33
# File 'lib/pindo/module/task/core/task_queue.rb', line 27

def add(task, sort_by_priority: true)
  @mutex.synchronize do
    @pending << task
    @pending.sort_by! { |t| -t.priority } if sort_by_priority
    task.id
  end
end

#add_all(tasks) ⇒ Array<String>

批量添加任务



38
39
40
41
42
43
44
# File 'lib/pindo/module/task/core/task_queue.rb', line 38

def add_all(tasks)
  @mutex.synchronize do
    @pending.concat(tasks)
    @pending.sort_by! { |t| -t.priority }
    tasks.map(&:id)
  end
end

#add_running(task) ⇒ Object

添加正在运行的任务(并发执行使用)



73
74
75
# File 'lib/pindo/module/task/core/task_queue.rb', line 73

def add_running(task)
  @mutex.synchronize { @running << task unless @running.include?(task) }
end

#all_pending? {|task| ... } ⇒ Boolean

检查所有待执行任务是否满足条件

Yields:

  • (task)

    检查条件

Yield Parameters:

Yield Returns:

  • (Boolean)

    是否满足



214
215
216
217
218
# File 'lib/pindo/module/task/core/task_queue.rb', line 214

def all_pending?(&block)
  @mutex.synchronize do
    @pending.all?(&block)
  end
end

#all_tasksArray<PindoTask>

获取所有任务



190
191
192
193
194
195
196
# File 'lib/pindo/module/task/core/task_queue.rb', line 190

def all_tasks
  @mutex.synchronize do
    tasks = @pending + @running + @completed
    tasks << @current if @current
    tasks
  end
end

#clear_allObject

清空所有队列



179
180
181
182
183
184
185
186
# File 'lib/pindo/module/task/core/task_queue.rb', line 179

def clear_all
  @mutex.synchronize do
    @pending.clear
    @running.clear
    @completed.clear
    @current = nil
  end
end

#complete_currentPindoTask?

完成当前任务(原子操作)



86
87
88
89
90
91
92
93
94
# File 'lib/pindo/module/task/core/task_queue.rb', line 86

def complete_current
  @mutex.synchronize do
    return nil unless @current
    task = @current
    @completed << task
    @current = nil
    task
  end
end

#completed_countInteger

获取已完成任务数量



172
173
174
# File 'lib/pindo/module/task/core/task_queue.rb', line 172

def completed_count
  @mutex.synchronize { @completed.size }
end

#completed_count_by_statusHash{Symbol => Integer}

按状态统计已完成任务



135
136
137
138
139
# File 'lib/pindo/module/task/core/task_queue.rb', line 135

def completed_count_by_status
  @mutex.synchronize do
    @completed.group_by(&:status).transform_values(&:count)
  end
end

#completed_snapshotArray<PindoTask>

返回已完成队列快照



123
124
125
# File 'lib/pindo/module/task/core/task_queue.rb', line 123

def completed_snapshot
  @mutex.synchronize { @completed.dup }
end

#currentPindoTask?

获取当前任务



67
68
69
# File 'lib/pindo/module/task/core/task_queue.rb', line 67

def current
  @mutex.synchronize { @current }
end

#find(task_id) ⇒ PindoTask?

查找任务(在所有队列中)



144
145
146
147
148
149
150
# File 'lib/pindo/module/task/core/task_queue.rb', line 144

def find(task_id)
  @mutex.synchronize do
    all_tasks = @pending + @running + @completed
    all_tasks << @current if @current
    all_tasks.find { |t| t.id == task_id }
  end
end

#find_and_remove {|task| ... } ⇒ PindoTask?

查找并移除任务(原子操作)

Yields:

  • (task)

    查找条件

Yield Parameters:

Yield Returns:

  • (Boolean)

    是否匹配



51
52
53
54
55
56
57
# File 'lib/pindo/module/task/core/task_queue.rb', line 51

def find_and_remove(&block)
  @mutex.synchronize do
    task = @pending.find(&block)
    @pending.delete(task) if task
    task
  end
end

#find_pending {|task| ... } ⇒ PindoTask?

条件查找(在待执行队列中)

Yields:

  • (task)

    查找条件

Yield Parameters:

Yield Returns:

  • (Boolean)

    是否匹配



203
204
205
206
207
# File 'lib/pindo/module/task/core/task_queue.rb', line 203

def find_pending(&block)
  @mutex.synchronize do
    @pending.find(&block)
  end
end

#has_current?Boolean

检查是否有当前任务



160
161
162
# File 'lib/pindo/module/task/core/task_queue.rb', line 160

def has_current?
  @mutex.synchronize { !@current.nil? }
end

#mark_completed(task) ⇒ Object

添加到已完成队列



107
108
109
110
111
# File 'lib/pindo/module/task/core/task_queue.rb', line 107

def mark_completed(task)
  @mutex.synchronize do
    @completed << task unless @completed.include?(task)
  end
end

#pending_countInteger

获取待执行任务数量



166
167
168
# File 'lib/pindo/module/task/core/task_queue.rb', line 166

def pending_count
  @mutex.synchronize { @pending.size }
end

#pending_empty?Boolean

检查待执行队列是否为空



154
155
156
# File 'lib/pindo/module/task/core/task_queue.rb', line 154

def pending_empty?
  @mutex.synchronize { @pending.empty? }
end

#pending_grouped_by_typeHash{Symbol => Array<PindoTask>}

按类型分组待执行任务



129
130
131
# File 'lib/pindo/module/task/core/task_queue.rb', line 129

def pending_grouped_by_type
  @mutex.synchronize { @pending.group_by(&:type) }
end

#pending_snapshotArray<PindoTask>

返回待执行队列快照(不持锁遍历)



117
118
119
# File 'lib/pindo/module/task/core/task_queue.rb', line 117

def pending_snapshot
  @mutex.synchronize { @pending.dup }
end

#remove_pending(task) ⇒ Boolean

移除待执行任务(用于标记失败/取消)



99
100
101
102
103
# File 'lib/pindo/module/task/core/task_queue.rb', line 99

def remove_pending(task)
  @mutex.synchronize do
    @pending.delete(task) != nil
  end
end

#remove_running(task) ⇒ Boolean

移除正在运行的任务(并发执行使用)



80
81
82
# File 'lib/pindo/module/task/core/task_queue.rb', line 80

def remove_running(task)
  @mutex.synchronize { @running.delete(task) != nil }
end

#set_current(task) ⇒ Object

设置当前任务(串行执行使用)



61
62
63
# File 'lib/pindo/module/task/core/task_queue.rb', line 61

def set_current(task)
  @mutex.synchronize { @current = task }
end