Class: Redborder::KafkaNotifier::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_notifier/message_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(limit = 500, timeout = 1) ⇒ MessageQueue

Returns a new instance of MessageQueue.



19
20
21
22
23
24
25
# File 'lib/kafka_notifier/message_queue.rb', line 19

def initialize(limit = 500, timeout = 1)
  @mutex = Mutex.new
  @queue = []
  @limit = limit
  @timeout = timeout
  @recieved = ConditionVariable.new
end

Instance Method Details

#<<(x) ⇒ Object



27
28
29
30
31
32
# File 'lib/kafka_notifier/message_queue.rb', line 27

def <<(x)
  @mutex.synchronize do
    @queue << x
    @recieved.signal if @queue.size >= @limit
  end
end

#pop_allObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka_notifier/message_queue.rb', line 34

def pop_all
  @mutex.synchronize do
    @recieved.wait(@mutex, @timeout) while @queue.empty?
    to_return = []

    if @queue.size >= @limit
      to_return = @queue[0..(@limit - 1)]
      @queue = @queue[@limit..-1]
    else
      @recieved.wait(@mutex, @timeout)
      to_return = @queue.dup
      @queue.clear
    end

    return to_return
  end
end

#sizeObject



52
53
54
55
56
# File 'lib/kafka_notifier/message_queue.rb', line 52

def size
  @mutex.synchronize do
    @queue.size
  end
end