Class: WorkQueue
- Inherits:
-
Object
- Object
- WorkQueue
- Defined in:
- lib/work_queue.rb
Overview
A tunable work queue, designed to coordinate work between a producer and a pool of worker threads.
Constant Summary collapse
- VERSION =
"2.5.4"
Instance Method Summary collapse
-
#cur_tasks ⇒ Object
Returns the current number of queued tasks.
-
#cur_threads ⇒ Object
Returns the current number of worker threads.
-
#enqueue_b(*params, &block) ⇒ Object
Schedules the given Block for future execution by a worker thread.
-
#enqueue_p(proc, *params) ⇒ Object
Schedules the given Proc for future execution by a worker thread.
-
#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue
constructor
Creates an empty work queue with the desired parameters.
-
#join ⇒ Object
Waits until the tasks queue is empty and all worker threads have finished.
-
#kill ⇒ Object
Halt all worker threads immediately, aborting any ongoing tasks.
-
#max_tasks ⇒ Object
Returns the maximum number of queued tasks.
-
#max_threads ⇒ Object
Returns the maximum number of worker threads.
Constructor Details
#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue
Creates an empty work queue with the desired parameters. It’s generally recommended to bound the resources used.
Parameter(s)
-
max_threads
- Maximum number of worker threads. -
max_tasks
- Maximum number of queued tasks.
Example(s)
wq = WorkQueue.new 10, nil
wq = WorkQueue.new nil, 20
wq = WorkQueue.new 10, 20
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/work_queue.rb', line 23 def initialize(max_threads = nil, max_tasks = nil) self.max_threads = max_threads self.max_tasks = max_tasks @threads = [] @threads_waiting = 0 @threads.extend MonitorMixin @tasks = [] @tasks.extend MonitorMixin @task_enqueued = @tasks.new_cond @task_dequeued = @tasks.new_cond @task_completed = @tasks.new_cond @tasks_pending = 0 end |
Instance Method Details
#cur_tasks ⇒ Object
Returns the current number of queued tasks. This value is just a snapshot, and may change immediately upon returning.
Example(s)
wq = WorkQueue.new 1
wq.enqueue_b { ... }
wq.cur_tasks #=> 0
wq.enqueue_b { ... }
wq.cur_tasks #=> 1
90 91 92 |
# File 'lib/work_queue.rb', line 90 def cur_tasks @tasks.size end |
#cur_threads ⇒ Object
Returns the current number of worker threads. This value is just a snapshot, and may change immediately upon returning.
Example(s)
wq = WorkQueue.new 1
wq.cur_threads #=> 0
wq.enqueue_b { ... }
wq.cur_threads #=> 1
61 62 63 |
# File 'lib/work_queue.rb', line 61 def cur_threads @threads.size end |
#enqueue_b(*params, &block) ⇒ Object
Schedules the given Block for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.
Parameter(s)
-
params
- Parameters passed to the given block.
Example(s)
wq = WorkQueue.new
wq.enqueue_b("Parameter") { |obj| ... }
105 106 107 |
# File 'lib/work_queue.rb', line 105 def enqueue_b(*params, &block) enqueue block, params end |
#enqueue_p(proc, *params) ⇒ Object
Schedules the given Proc for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.
Parameter(s)
-
proc
- Proc to be executed. -
params
- Parameters passed to the given proc.
Example(s)
wq = WorkQueue.new
wq.enqueue_p(Proc.new { |obj| ... }, "Parameter")
121 122 123 |
# File 'lib/work_queue.rb', line 121 def enqueue_p(proc, *params) enqueue proc, params end |
#join ⇒ Object
Waits until the tasks queue is empty and all worker threads have finished.
Example(s)
wq = WorkQueue.new
wq.enqueue_b { ... }
wq.join
133 134 135 136 137 |
# File 'lib/work_queue.rb', line 133 def join @tasks.synchronize do @task_completed.wait_while { @tasks_pending > 0 } end end |
#kill ⇒ Object
Halt all worker threads immediately, aborting any ongoing tasks. Resets all
Example(s)
wq = WorkQueue.new
wq.enqueue_b { ... }
wq.kill
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/work_queue.rb', line 148 def kill @tasks.synchronize do @threads.synchronize do @threads.each(&:exit) @threads.clear @threads_waiting = 0 end @tasks.clear @tasks_pending = 0 @task_dequeued.broadcast @task_completed.broadcast end end |
#max_tasks ⇒ Object
75 76 77 |
# File 'lib/work_queue.rb', line 75 def max_tasks @max_tasks end |
#max_threads ⇒ Object
47 48 49 |
# File 'lib/work_queue.rb', line 47 def max_threads @max_threads end |