Class: Pindo::TaskSystem::TaskQueue
- Inherits:
-
Object
- Object
- Pindo::TaskSystem::TaskQueue
- Defined in:
- lib/pindo/module/task/core/task_queue.rb
Overview
TaskQueue - 任务队列管理
职责:
-
管理待执行队列、已完成队列和当前任务
-
提供线程安全的队列操作
-
提供原子操作 API
-
提供高级查询功能(分组、统计)
Instance Method Summary collapse
-
#add(task, sort_by_priority: true) ⇒ String
添加任务(线程安全).
-
#add_all(tasks) ⇒ Array<String>
批量添加任务.
-
#add_running(task) ⇒ Object
添加正在运行的任务(并发执行使用).
-
#all_pending? {|task| ... } ⇒ Boolean
检查所有待执行任务是否满足条件.
-
#all_tasks ⇒ Array<PindoTask>
获取所有任务.
-
#clear_all ⇒ Object
清空所有队列.
-
#complete_current ⇒ PindoTask?
完成当前任务(原子操作).
-
#completed_count ⇒ Integer
获取已完成任务数量.
-
#completed_count_by_status ⇒ Hash{Symbol => Integer}
按状态统计已完成任务.
-
#completed_snapshot ⇒ Array<PindoTask>
返回已完成队列快照.
-
#current ⇒ PindoTask?
获取当前任务.
-
#find(task_id) ⇒ PindoTask?
查找任务(在所有队列中).
-
#find_and_remove {|task| ... } ⇒ PindoTask?
查找并移除任务(原子操作).
-
#find_pending {|task| ... } ⇒ PindoTask?
条件查找(在待执行队列中).
-
#has_current? ⇒ Boolean
检查是否有当前任务.
-
#initialize ⇒ TaskQueue
constructor
A new instance of TaskQueue.
-
#mark_completed(task) ⇒ Object
添加到已完成队列.
-
#pending_count ⇒ Integer
获取待执行任务数量.
-
#pending_empty? ⇒ Boolean
检查待执行队列是否为空.
-
#pending_grouped_by_type ⇒ Hash{Symbol => Array<PindoTask>}
按类型分组待执行任务.
-
#pending_snapshot ⇒ Array<PindoTask>
返回待执行队列快照(不持锁遍历).
-
#remove_pending(task) ⇒ Boolean
移除待执行任务(用于标记失败/取消).
-
#remove_running(task) ⇒ Boolean
移除正在运行的任务(并发执行使用).
-
#set_current(task) ⇒ Object
设置当前任务(串行执行使用).
Constructor Details
#initialize ⇒ TaskQueue
13 14 15 16 17 18 19 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 13 def initialize @pending = [] # 待执行队列 @running = [] # 正在运行队列(用于并发执行) @completed = [] # 已完成队列 @current = nil # 当前正在执行的任务(用于串行执行) @mutex = Mutex.new # 线程安全锁 end |
Instance Method Details
#add(task, sort_by_priority: true) ⇒ String
添加任务(线程安全)
27 28 29 30 31 32 33 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 27 def add(task, sort_by_priority: true) @mutex.synchronize do @pending << task @pending.sort_by! { |t| -t.priority } if sort_by_priority task.id end end |
#add_all(tasks) ⇒ Array<String>
批量添加任务
38 39 40 41 42 43 44 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 38 def add_all(tasks) @mutex.synchronize do @pending.concat(tasks) @pending.sort_by! { |t| -t.priority } tasks.map(&:id) end end |
#add_running(task) ⇒ Object
添加正在运行的任务(并发执行使用)
73 74 75 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 73 def add_running(task) @mutex.synchronize { @running << task unless @running.include?(task) } end |
#all_pending? {|task| ... } ⇒ Boolean
检查所有待执行任务是否满足条件
214 215 216 217 218 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 214 def all_pending?(&block) @mutex.synchronize do @pending.all?(&block) end end |
#all_tasks ⇒ Array<PindoTask>
获取所有任务
190 191 192 193 194 195 196 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 190 def all_tasks @mutex.synchronize do tasks = @pending + @running + @completed tasks << @current if @current tasks end end |
#clear_all ⇒ Object
清空所有队列
179 180 181 182 183 184 185 186 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 179 def clear_all @mutex.synchronize do @pending.clear @running.clear @completed.clear @current = nil end end |
#complete_current ⇒ PindoTask?
完成当前任务(原子操作)
86 87 88 89 90 91 92 93 94 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 86 def complete_current @mutex.synchronize do return nil unless @current task = @current @completed << task @current = nil task end end |
#completed_count ⇒ Integer
获取已完成任务数量
172 173 174 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 172 def completed_count @mutex.synchronize { @completed.size } end |
#completed_count_by_status ⇒ Hash{Symbol => Integer}
按状态统计已完成任务
135 136 137 138 139 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 135 def completed_count_by_status @mutex.synchronize do @completed.group_by(&:status).transform_values(&:count) end end |
#completed_snapshot ⇒ Array<PindoTask>
返回已完成队列快照
123 124 125 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 123 def completed_snapshot @mutex.synchronize { @completed.dup } end |
#current ⇒ PindoTask?
获取当前任务
67 68 69 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 67 def current @mutex.synchronize { @current } end |
#find(task_id) ⇒ PindoTask?
查找任务(在所有队列中)
144 145 146 147 148 149 150 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 144 def find(task_id) @mutex.synchronize do all_tasks = @pending + @running + @completed all_tasks << @current if @current all_tasks.find { |t| t.id == task_id } end end |
#find_and_remove {|task| ... } ⇒ PindoTask?
查找并移除任务(原子操作)
51 52 53 54 55 56 57 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 51 def find_and_remove(&block) @mutex.synchronize do task = @pending.find(&block) @pending.delete(task) if task task end end |
#find_pending {|task| ... } ⇒ PindoTask?
条件查找(在待执行队列中)
203 204 205 206 207 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 203 def find_pending(&block) @mutex.synchronize do @pending.find(&block) end end |
#has_current? ⇒ Boolean
检查是否有当前任务
160 161 162 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 160 def has_current? @mutex.synchronize { !@current.nil? } end |
#mark_completed(task) ⇒ Object
添加到已完成队列
107 108 109 110 111 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 107 def mark_completed(task) @mutex.synchronize do @completed << task unless @completed.include?(task) end end |
#pending_count ⇒ Integer
获取待执行任务数量
166 167 168 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 166 def pending_count @mutex.synchronize { @pending.size } end |
#pending_empty? ⇒ Boolean
检查待执行队列是否为空
154 155 156 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 154 def pending_empty? @mutex.synchronize { @pending.empty? } end |
#pending_grouped_by_type ⇒ Hash{Symbol => Array<PindoTask>}
按类型分组待执行任务
129 130 131 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 129 def pending_grouped_by_type @mutex.synchronize { @pending.group_by(&:type) } end |
#pending_snapshot ⇒ Array<PindoTask>
返回待执行队列快照(不持锁遍历)
117 118 119 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 117 def pending_snapshot @mutex.synchronize { @pending.dup } end |
#remove_pending(task) ⇒ Boolean
移除待执行任务(用于标记失败/取消)
99 100 101 102 103 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 99 def remove_pending(task) @mutex.synchronize do @pending.delete(task) != nil end end |
#remove_running(task) ⇒ Boolean
移除正在运行的任务(并发执行使用)
80 81 82 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 80 def remove_running(task) @mutex.synchronize { @running.delete(task) != nil } end |
#set_current(task) ⇒ Object
设置当前任务(串行执行使用)
61 62 63 |
# File 'lib/pindo/module/task/core/task_queue.rb', line 61 def set_current(task) @mutex.synchronize { @current = task } end |