Class: LogCourier::EventQueue
- Inherits:
-
Object
- Object
- LogCourier::EventQueue
- Defined in:
- lib/log-courier/event_queue.rb
Overview
EventQueue
Instance Attribute Summary collapse
-
#max ⇒ Object
Returns the maximum size of the queue.
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#initialize(max) ⇒ EventQueue
constructor
Creates a fixed-length queue with a maximum size of
max
. -
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #shift, #deq)
Retrieves data from the queue and runs a waiting thread, if any.
-
#push(obj, timeout = nil) ⇒ Object
(also: #<<, #enq)
Pushes
obj
to the queue.
Constructor Details
#initialize(max) ⇒ EventQueue
Creates a fixed-length queue with a maximum size of max
.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/log-courier/event_queue.rb', line 34 def initialize(max) raise ArgumentError, 'queue size must be positive' unless max.positive? @max = max @enque_cond = ConditionVariable.new @num_enqueue_waiting = 0 @que = [] @que.taint # enable tainted communication @num_waiting = 0 taint @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Attribute Details
#max ⇒ Object
Returns the maximum size of the queue.
52 53 54 |
# File 'lib/log-courier/event_queue.rb', line 52 def max @max end |
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
142 143 144 145 146 147 |
# File 'lib/log-courier/event_queue.rb', line 142 def clear @mutex.synchronize do @que.clear end self end |
#empty? ⇒ Boolean
Returns true
if the queue is empty.
133 134 135 136 137 |
# File 'lib/log-courier/event_queue.rb', line 133 def empty? @mutex.synchronize do return @que.empty? end end |
#length ⇒ Object Also known as: size
Returns the length of the queue.
152 153 154 155 156 |
# File 'lib/log-courier/event_queue.rb', line 152 def length @mutex.synchronize do return @que.length end end |
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
166 167 168 169 170 |
# File 'lib/log-courier/event_queue.rb', line 166 def num_waiting @mutex.synchronize do return @num_waiting + @num_enqueue_waiting end end |
#pop(*args) ⇒ Object Also known as: shift, deq
Retrieves data from the queue and runs a waiting thread, if any.
112 113 114 115 116 117 118 |
# File 'lib/log-courier/event_queue.rb', line 112 def pop(*args) retval = pop_timeout(*args) @mutex.synchronize do @enque_cond.signal if @que.length < @max end retval end |
#push(obj, timeout = nil) ⇒ Object Also known as: <<, enq
Pushes obj
to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout
seconds.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/log-courier/event_queue.rb', line 77 def push(obj, timeout = nil) start = Time.now unless timeout.nil? @mutex.synchronize do loop do break if @que.length < @max @num_enqueue_waiting += 1 begin @enque_cond.wait @mutex, timeout ensure @num_enqueue_waiting -= 1 end raise TimeoutError if !timeout.nil? && Time.now - start >= timeout end @que.push obj @cond.signal end self end |