Class: DatWorkerPool::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



9
10
11
12
13
14
15
16
17
# File 'lib/dat-worker-pool/queue.rb', line 9

def initialize
  @work_items         = []
  @shutdown           = false
  @mutex              = Mutex.new
  @condition_variable = ConditionVariable.new

  @on_pop_callbacks  = []
  @on_push_callbacks = []
end

Instance Attribute Details

#on_pop_callbacksObject

Returns the value of attribute on_pop_callbacks.



7
8
9
# File 'lib/dat-worker-pool/queue.rb', line 7

def on_pop_callbacks
  @on_pop_callbacks
end

#on_push_callbacksObject

Returns the value of attribute on_push_callbacks.



7
8
9
# File 'lib/dat-worker-pool/queue.rb', line 7

def on_push_callbacks
  @on_push_callbacks
end

Instance Method Details

#empty?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/dat-worker-pool/queue.rb', line 57

def empty?
  @mutex.synchronize{ @work_items.empty? }
end

#popObject

  • Sleeps the current thread (‘@condition_variable.wait(@mutex)`) until it is signaled via push or shutdown.



43
44
45
46
47
48
49
50
51
# File 'lib/dat-worker-pool/queue.rb', line 43

def pop
  return if @shutdown
  item = @mutex.synchronize do
    @condition_variable.wait(@mutex) while !@shutdown && @work_items.empty?
    @work_items.shift
  end
  @on_pop_callbacks.each(&:call)
  item
end

#push(work_item) ⇒ Object

  • Add the work and wake up the first thread waiting from calling pop

(`@condition_variable.signal`).


32
33
34
35
36
37
38
39
# File 'lib/dat-worker-pool/queue.rb', line 32

def push(work_item)
  raise "Unable to add work while shutting down" if @shutdown
  @mutex.synchronize do
    @work_items << work_item
    @condition_variable.signal
  end
  @on_push_callbacks.each(&:call)
end

#shutdownObject

  • Wakes up any threads (‘@condition_variable.broadcast`) who are sleeping because of pop.



25
26
27
28
# File 'lib/dat-worker-pool/queue.rb', line 25

def shutdown
  @shutdown = true
  @mutex.synchronize{ @condition_variable.broadcast }
end

#shutdown?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/dat-worker-pool/queue.rb', line 61

def shutdown?
  @shutdown
end

#startObject



19
20
21
# File 'lib/dat-worker-pool/queue.rb', line 19

def start
  @shutdown = false
end

#work_itemsObject



53
54
55
# File 'lib/dat-worker-pool/queue.rb', line 53

def work_items
  @mutex.synchronize{ @work_items }
end