Class: LogCourier::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/event_queue.rb

Overview

EventQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max) ⇒ EventQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/log-courier/event_queue.rb', line 34

def initialize(max)
  raise ArgumentError, 'queue size must be positive' unless max.positive?

  @max = max
  @enque_cond = ConditionVariable.new
  @num_enqueue_waiting = 0

  @que = []
  @que.taint # enable tainted communication
  @num_waiting = 0
  taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Attribute Details

#maxObject

Returns the maximum size of the queue.



52
53
54
# File 'lib/log-courier/event_queue.rb', line 52

def max
  @max
end

Instance Method Details

#clearObject

Removes all objects from the queue.



142
143
144
145
146
147
# File 'lib/log-courier/event_queue.rb', line 142

def clear
  @mutex.synchronize do
    @que.clear
  end
  self
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


133
134
135
136
137
# File 'lib/log-courier/event_queue.rb', line 133

def empty?
  @mutex.synchronize do
    return @que.empty?
  end
end

#lengthObject Also known as: size

Returns the length of the queue.



152
153
154
155
156
# File 'lib/log-courier/event_queue.rb', line 152

def length
  @mutex.synchronize do
    return @que.length
  end
end

#num_waitingObject

Returns the number of threads waiting on the queue.



166
167
168
169
170
# File 'lib/log-courier/event_queue.rb', line 166

def num_waiting
  @mutex.synchronize do
    return @num_waiting + @num_enqueue_waiting
  end
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



112
113
114
115
116
117
118
# File 'lib/log-courier/event_queue.rb', line 112

def pop(*args)
  retval = pop_timeout(*args)
  @mutex.synchronize do
    @enque_cond.signal if @que.length < @max
  end
  retval
end

#push(obj, timeout = nil) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout seconds.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/log-courier/event_queue.rb', line 77

def push(obj, timeout = nil)
  start = Time.now unless timeout.nil?
  @mutex.synchronize do
    loop do
      break if @que.length < @max

      @num_enqueue_waiting += 1
      begin
        @enque_cond.wait @mutex, timeout
      ensure
        @num_enqueue_waiting -= 1
      end

      raise TimeoutError if !timeout.nil? && Time.now - start >= timeout
    end

    @que.push obj
    @cond.signal
  end
  self
end