Class: LogStash::SizedQueueTimeout
- Inherits:
-
Object
- Object
- LogStash::SizedQueueTimeout
- Defined in:
- lib/logstash/sized_queue_timeout.rb
Overview
Minimal subset implement of a SizedQueue supporting a timeout option on the lock.
This will be part of the main Logstash’s sized queue
Defined Under Namespace
Classes: TimeoutError
Constant Summary collapse
- DEFAULT_TIMEOUT =
in seconds
5
Instance Method Summary collapse
-
#initialize(max_size, options = {}) ⇒ SizedQueueTimeout
constructor
A new instance of SizedQueueTimeout.
- #pop_no_timeout ⇒ Object
- #push(obj, timeout = DEFAULT_TIMEOUT) ⇒ Object (also: #<<)
- #size ⇒ Object
Constructor Details
#initialize(max_size, options = {}) ⇒ SizedQueueTimeout
Returns a new instance of SizedQueueTimeout.
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/logstash/sized_queue_timeout.rb', line 14 def initialize(max_size, = {}) # `concurrent-ruby` are deprecating the `Condition` # in favor of a Synchonization class that you need to implement. # this was bit overkill to only check if the wait did a timeout. @condition_in = ConditionVariable.new @condition_out = ConditionVariable.new @max_size = max_size @queue = [] @mutex = Mutex.new end |
Instance Method Details
#pop_no_timeout ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/logstash/sized_queue_timeout.rb', line 48 def pop_no_timeout @mutex.synchronize do @condition_in.wait(@mutex) while @queue.empty? # Wake up check obj = @queue.shift @condition_out.signal return obj end end |
#push(obj, timeout = DEFAULT_TIMEOUT) ⇒ Object Also known as: <<
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/logstash/sized_queue_timeout.rb', line 26 def push(obj, timeout = DEFAULT_TIMEOUT) @mutex.synchronize do while full? # wake up check start_time = Concurrent.monotonic_time @condition_out.wait(@mutex, timeout) if start_time + timeout - Concurrent.monotonic_time < 0 raise TimeoutError end end @queue << obj @condition_in.signal return obj end end |
#size ⇒ Object
44 45 46 |
# File 'lib/logstash/sized_queue_timeout.rb', line 44 def size @mutex.synchronize { @queue.size } end |