Class: SyncQueue
- Inherits:
-
Object
- Object
- SyncQueue
- Defined in:
- lib/devp2p/sync_queue.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #clear ⇒ Object
- #deq(non_block = false) ⇒ Object
- #empty? ⇒ Boolean
- #enq(obj, non_block = false) ⇒ Object (also: #<<)
- #full? ⇒ Boolean
-
#initialize(max_size = nil) ⇒ SyncQueue
constructor
A new instance of SyncQueue.
- #length ⇒ Object (also: #size)
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#peek(non_block = false) ⇒ Object
Same as pop except it will not remove the element from queue, just peek.
Constructor Details
#initialize(max_size = nil) ⇒ SyncQueue
Returns a new instance of SyncQueue.
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/devp2p/sync_queue.rb', line 9 def initialize(max_size=nil) @queue = [] @num_waiting = 0 @max_size = max_size @mutex = Mutex.new @cond_full = ConditionVariable.new @cond_empty = ConditionVariable.new end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/devp2p/sync_queue.rb', line 7 def queue @queue end |
Instance Method Details
#clear ⇒ Object
103 104 105 |
# File 'lib/devp2p/sync_queue.rb', line 103 def clear @queue.clear end |
#deq(non_block = false) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/devp2p/sync_queue.rb', line 46 def deq(non_block=false) Thread.handle_interrupt(StandardError => :on_blocking) do loop do @mutex.synchronize do if empty? if non_block raise ThreadError, 'queue empty' else begin @num_waiting += 1 @cond_empty.wait @mutex ensure @num_waiting -= 1 end end else obj = @queue.shift @cond_full.signal return obj end end end end end |
#empty? ⇒ Boolean
99 100 101 |
# File 'lib/devp2p/sync_queue.rb', line 99 def empty? @queue.empty? end |
#enq(obj, non_block = false) ⇒ Object Also known as: <<
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/devp2p/sync_queue.rb', line 20 def enq(obj, non_block=false) Thread.handle_interrupt(StandardError => :on_blocking) do loop do @mutex.synchronize do if full? if non_block raise ThreadError, 'queue full' else begin @num_waiting += 1 @cond_full.wait @mutex ensure @num_waiting -= 1 end end else @queue.push obj @cond_empty.signal return obj end end end end end |
#full? ⇒ Boolean
95 96 97 |
# File 'lib/devp2p/sync_queue.rb', line 95 def full? @max_size && @queue.size >= @max_size end |
#length ⇒ Object Also known as: size
107 108 109 |
# File 'lib/devp2p/sync_queue.rb', line 107 def length @queue.length end |
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
113 114 115 |
# File 'lib/devp2p/sync_queue.rb', line 113 def num_waiting @num_waiting end |
#peek(non_block = false) ⇒ Object
Same as pop except it will not remove the element from queue, just peek.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/devp2p/sync_queue.rb', line 72 def peek(non_block=false) Thread.handle_interrupt(StandardError => :on_blocking) do loop do @mutex.synchronize do if empty? if non_block raise ThreadError, 'queue empty' else begin @num_waiting += 1 @cond_empty.wait @mutex ensure @num_waiting -= 1 end end else return @queue[0] end end end end end |