Class: LogCourier::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/event_queue.rb

Direct Known Subclasses

ClientZmq

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max) ⇒ EventQueue

Creates a fixed-length queue with a maximum size of max.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/log-courier/event_queue.rb', line 35

def initialize(max)
  fail ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @enque_cond = ConditionVariable.new
  @num_enqueue_waiting = 0

  @que = []
  @que.taint          # enable tainted communication
  @num_waiting = 0
  self.taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  return
end

Instance Attribute Details

#maxObject

Returns the maximum size of the queue.



53
54
55
# File 'lib/log-courier/event_queue.rb', line 53

def max
  @max
end

Instance Method Details

#clearObject

Removes all objects from the queue.



144
145
146
147
# File 'lib/log-courier/event_queue.rb', line 144

def clear
  @que.clear
  self
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


137
138
139
# File 'lib/log-courier/event_queue.rb', line 137

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



152
153
154
# File 'lib/log-courier/event_queue.rb', line 152

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



164
165
166
# File 'lib/log-courier/event_queue.rb', line 164

def num_waiting
  @num_waiting + @num_enqueue_waiting
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



114
115
116
117
118
119
120
121
122
# File 'lib/log-courier/event_queue.rb', line 114

def pop(*args)
  retval = pop_timeout *args
  @mutex.synchronize do
    if @que.length < @max
      @enque_cond.signal
    end
  end
  retval
end

#push(obj, timeout = nil) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout seconds.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/log-courier/event_queue.rb', line 79

def push(obj, timeout = nil)
  unless timeout.nil?
    start = Time.now
  end
  @mutex.synchronize do
    loop do
      break if @que.length < @max
      @num_enqueue_waiting += 1
      begin
        @enque_cond.wait @mutex, timeout
      ensure
        @num_enqueue_waiting -= 1
      end
      fail TimeoutError if !timeout.nil? and Time.now - start >= timeout
    end

    @que.push obj
    @cond.signal
  end
  self
end