Class: LogStash::SizedQueueTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/sized_queue_timeout.rb

Overview

Minimal subset implement of a SizedQueue supporting a timeout option on the lock.

This will be part of the main Logstash’s sized queue

Defined Under Namespace

Classes: TimeoutError

Constant Summary collapse

DEFAULT_TIMEOUT =

in seconds

5

Instance Method Summary collapse

Constructor Details

#initialize(max_size, options = {}) ⇒ SizedQueueTimeout

Returns a new instance of SizedQueueTimeout.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/logstash/sized_queue_timeout.rb', line 14

def initialize(max_size, options = {})
  # `concurrent-ruby` are deprecating the `Condition`
  # in favor of a Synchonization class that you need to implement.
  # this was bit overkill to only check if the wait did a timeout.
  @condition_in = ConditionVariable.new
  @condition_out = ConditionVariable.new

  @max_size = max_size
  @queue = []
  @mutex = Mutex.new
end

Instance Method Details

#pop_no_timeoutObject



48
49
50
51
52
53
54
55
56
57
# File 'lib/logstash/sized_queue_timeout.rb', line 48

def pop_no_timeout
  @mutex.synchronize do
    @condition_in.wait(@mutex) while @queue.empty? # Wake up check

    obj = @queue.shift
    @condition_out.signal

    return obj
  end
end

#push(obj, timeout = DEFAULT_TIMEOUT) ⇒ Object Also known as: <<



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/logstash/sized_queue_timeout.rb', line 26

def push(obj, timeout = DEFAULT_TIMEOUT)
  @mutex.synchronize do
    while full? # wake up check
      start_time = Concurrent.monotonic_time
      @condition_out.wait(@mutex, timeout) 
      if start_time + timeout - Concurrent.monotonic_time  < 0
        raise TimeoutError
      end
    end

    @queue << obj
    @condition_in.signal

    return obj
  end
end

#sizeObject



44
45
46
# File 'lib/logstash/sized_queue_timeout.rb', line 44

def size
  @mutex.synchronize { @queue.size }
end