Class: DatWorkerPool::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool/queue.rb

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



7
8
9
10
11
12
# File 'lib/dat-worker-pool/queue.rb', line 7

def initialize
  @work_items = []
  @shutdown = false
  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
end

Instance Method Details

#empty?Boolean

Returns:



32
33
34
# File 'lib/dat-worker-pool/queue.rb', line 32

def empty?
  @mutex.synchronize{ @work_items.empty? }
end

#popObject



28
29
30
# File 'lib/dat-worker-pool/queue.rb', line 28

def pop
  @mutex.synchronize{ @work_items.shift }
end

#push(work_item) ⇒ Object

Add the work_item and wake up the first worker (the signal) that’s waiting (because of wait_for_work_item)



20
21
22
23
24
25
26
# File 'lib/dat-worker-pool/queue.rb', line 20

def push(work_item)
  raise "Unable to add work while shutting down" if @shutdown
  @mutex.synchronize do
    @work_items << work_item
    @condition_variable.signal
  end
end

#shutdownObject

wake up any workers who are idle (because of wait_for_work_item)



43
44
45
46
# File 'lib/dat-worker-pool/queue.rb', line 43

def shutdown
  @shutdown = true
  @mutex.synchronize{ @condition_variable.broadcast }
end

#wait_for_work_itemObject

wait to be signaled by push



37
38
39
40
# File 'lib/dat-worker-pool/queue.rb', line 37

def wait_for_work_item
  return if @shutdown
  @mutex.synchronize{ @condition_variable.wait(@mutex) }
end

#work_itemsObject



14
15
16
# File 'lib/dat-worker-pool/queue.rb', line 14

def work_items
  @mutex.synchronize{ @work_items }
end