Class: LogStash::Util::WrappedSynchronousQueue::ReadBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_synchronous_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, size, wait) ⇒ ReadBatch

Returns a new instance of ReadBatch.



129
130
131
132
133
134
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 129

def initialize(queue, size, wait)
  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
  # @cancelled = Hash.new

  @originals = LsQueueUtils.drain(queue, size, wait)
end

Instance Method Details

#each(&blk) ⇒ Object



147
148
149
150
151
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 147

def each(&blk)
  # below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
  # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
  @originals.each {|e| blk.call(e) unless e.cancelled?}
end

#filtered_sizeObject Also known as: size



153
154
155
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 153

def filtered_size
  @originals.size
end

#merge(event) ⇒ Object



136
137
138
139
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 136

def merge(event)
  return if event.nil?
  @originals.add(event)
end

#to_aObject



141
142
143
144
145
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 141

def to_a
  events = []
  @originals.each {|e| events << e unless e.cancelled?}
  events
end