Class: PulsarSdk::Tweaks::TimeoutQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar_sdk/tweaks/timeout_queue.rb

Instance Method Summary collapse

Constructor Details

#initializeTimeoutQueue

Returns a new instance of TimeoutQueue.



4
5
6
7
8
9
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 4

def initialize
  @mutex = Mutex.new
  @queue = []
  @received = ConditionVariable.new
  @closed = false
end

Instance Method Details

#add(*args) ⇒ Object



11
12
13
14
15
16
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 11

def add(*args)
  @mutex.synchronize do
    @queue << args
    @received.signal
  end
end

#clearObject



18
19
20
21
22
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 18

def clear
  @mutex.synchronize do
    @queue.clear
  end
end

#closeObject



46
47
48
49
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 46

def close
  @closed = true
  @received.broadcast
end

#pop(timeout = nil) ⇒ Object

timeout 数字,单位秒



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 29

def pop(timeout = nil)
  @mutex.synchronize do
    if timeout.nil?
      while !@closed && @queue.empty?
        @received.wait(@mutex)
      end
    elsif @queue.empty? && timeout != 0
      timeout_at = TimeX.now.to_f + timeout
      while !@closed && @queue.empty? && (res = timeout_at - TimeX.now.to_f) > 0
        @received.wait(@mutex, res)
      end
    end

    @queue.pop
  end
end

#sizeObject



24
25
26
# File 'lib/pulsar_sdk/tweaks/timeout_queue.rb', line 24

def size
  @queue.size
end