Class: LogStash::Util::WrappedSynchronousQueue::ReadBatch
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedSynchronousQueue::ReadBatch
- Defined in:
- lib/logstash/util/wrapped_synchronous_queue.rb
Instance Method Summary collapse
- #each(&blk) ⇒ Object
- #filtered_size ⇒ Object (also: #size)
-
#initialize(queue, size, wait) ⇒ ReadBatch
constructor
A new instance of ReadBatch.
- #merge(event) ⇒ Object
- #to_a ⇒ Object
Constructor Details
#initialize(queue, size, wait) ⇒ ReadBatch
Returns a new instance of ReadBatch.
129 130 131 132 133 134 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 129 def initialize(queue, size, wait) # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor # @cancelled = Hash.new @originals = LsQueueUtils.drain(queue, size, wait) end |
Instance Method Details
#each(&blk) ⇒ Object
147 148 149 150 151 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 147 def each(&blk) # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor @originals.each {|e| blk.call(e) unless e.cancelled?} end |
#filtered_size ⇒ Object Also known as: size
153 154 155 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 153 def filtered_size @originals.size end |
#merge(event) ⇒ Object
136 137 138 139 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 136 def merge(event) return if event.nil? @originals.add(event) end |
#to_a ⇒ Object
141 142 143 144 145 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 141 def to_a events = [] @originals.each {|e| events << e unless e.cancelled?} events end |