Class: WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/work_queue.rb

Overview

A tunable work queue, designed to coordinate work between a producer and a pool of worker threads.

Constant Summary collapse

VERSION =
"2.5.4"

Instance Method Summary collapse

Constructor Details

#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue

Creates an empty work queue with the desired parameters. It’s generally recommended to bound the resources used.

Parameter(s)

  • max_threads - Maximum number of worker threads.

  • max_tasks - Maximum number of queued tasks.

Example(s)

wq = WorkQueue.new 10, nil
wq = WorkQueue.new nil, 20
wq = WorkQueue.new 10, 20


23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/work_queue.rb', line 23

def initialize(max_threads = nil, max_tasks = nil)
  self.max_threads = max_threads
  self.max_tasks = max_tasks
  @threads = []
  @threads_waiting = 0
  @threads.extend MonitorMixin
  @tasks = []
  @tasks.extend MonitorMixin
  @task_enqueued = @tasks.new_cond
  @task_dequeued = @tasks.new_cond
  @task_completed = @tasks.new_cond
  @tasks_pending = 0
end

Instance Method Details

#cur_tasksObject

Returns the current number of queued tasks. This value is just a snapshot, and may change immediately upon returning.

Example(s)

wq = WorkQueue.new 1
wq.enqueue_b { ... }
wq.cur_tasks		#=> 0
wq.enqueue_b { ... }
wq.cur_tasks		#=> 1


90
91
92
# File 'lib/work_queue.rb', line 90

def cur_tasks
  @tasks.size
end

#cur_threadsObject

Returns the current number of worker threads. This value is just a snapshot, and may change immediately upon returning.

Example(s)

wq = WorkQueue.new 1
wq.cur_threads		#=> 0
wq.enqueue_b { ... }
wq.cur_threads		#=> 1


61
62
63
# File 'lib/work_queue.rb', line 61

def cur_threads
  @threads.size
end

#enqueue_b(*params, &block) ⇒ Object

Schedules the given Block for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.

Parameter(s)

  • params - Parameters passed to the given block.

Example(s)

wq = WorkQueue.new
wq.enqueue_b("Parameter") { |obj| ... }


105
106
107
# File 'lib/work_queue.rb', line 105

def enqueue_b(*params, &block)
  enqueue block, params
end

#enqueue_p(proc, *params) ⇒ Object

Schedules the given Proc for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.

Parameter(s)

  • proc - Proc to be executed.

  • params - Parameters passed to the given proc.

Example(s)

wq = WorkQueue.new
wq.enqueue_p(Proc.new { |obj| ... }, "Parameter")


121
122
123
# File 'lib/work_queue.rb', line 121

def enqueue_p(proc, *params)
  enqueue proc, params
end

#joinObject

Waits until the tasks queue is empty and all worker threads have finished.

Example(s)

wq = WorkQueue.new
wq.enqueue_b { ... }
wq.join


133
134
135
136
137
# File 'lib/work_queue.rb', line 133

def join
  @tasks.synchronize do
    @task_completed.wait_while { @tasks_pending > 0 }
  end
end

#killObject

Halt all worker threads immediately, aborting any ongoing tasks. Resets all

Example(s)

wq = WorkQueue.new
wq.enqueue_b { ... }
wq.kill


148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/work_queue.rb', line 148

def kill
  @tasks.synchronize do
    @threads.synchronize do
      @threads.each(&:exit)
      @threads.clear
      @threads_waiting = 0
    end
    @tasks.clear
    @tasks_pending = 0
    @task_dequeued.broadcast
    @task_completed.broadcast
  end
end

#max_tasksObject

Returns the maximum number of queued tasks. This value is set upon initialization and cannot be changed afterwards.

Example(s)

wq = WorkQueue.new
wq.max_tasks		#=> Infinity
wq = WorkQueue.new nil, 1
wq.max_tasks		#=> 1


75
76
77
# File 'lib/work_queue.rb', line 75

def max_tasks
  @max_tasks
end

#max_threadsObject

Returns the maximum number of worker threads. This value is set upon initialization and cannot be changed afterwards.

Example(s)

wq = WorkQueue.new
wq.max_threads		#=> Infinity
wq = WorkQueue.new 1, nil
wq.max_threads		#=> 1


47
48
49
# File 'lib/work_queue.rb', line 47

def max_threads
  @max_threads
end