Class: LogStash::Util::WrappedSynchronousQueue::ReadBatch
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedSynchronousQueue::ReadBatch
- Defined in:
- lib/logstash/util/wrapped_synchronous_queue.rb
Instance Method Summary collapse
- #cancel(event) ⇒ Object
- #cancelled_size ⇒ Object
- #each(&blk) ⇒ Object
- #filtered_size ⇒ Object
-
#initialize(queue, size, wait) ⇒ ReadBatch
constructor
A new instance of ReadBatch.
- #merge(event) ⇒ Object
- #read_next ⇒ Object
- #size ⇒ Object
- #starting_size ⇒ Object
Constructor Details
#initialize(queue, size, wait) ⇒ ReadBatch
Returns a new instance of ReadBatch.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 174 def initialize(queue, size, wait) @queue = queue @size = size @wait = wait @originals = Hash.new # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor # @cancelled = Hash.new @generated = Hash.new = Hash.new = false # Atomic Boolean maybe? Although batches are not shared across threads @acked_batch = nil end |
Instance Method Details
#cancel(event) ⇒ Object
211 212 213 214 215 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 211 def cancel(event) # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor raise("cancel is unsupported") # @cancelled[event] = true end |
#cancelled_size ⇒ Object
245 246 247 248 249 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 245 def cancelled_size # TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor raise("cancelled_size is unsupported ") # @cancelled.size end |
#each(&blk) ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 217 def each(&blk) # take care not to cause @originals or @generated to change during iteration = true # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor @originals.each do |e, _| blk.call(e) unless e.cancelled? end @generated.each do |e, _| blk.call(e) unless e.cancelled? end = false update_generated end |
#filtered_size ⇒ Object
241 242 243 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 241 def filtered_size @originals.size + @generated.size end |
#merge(event) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 199 def merge(event) return if event.nil? || @originals.key?(event) # take care not to cause @generated to change during iteration # @iterating_temp is merged after the iteration if [event] = true else # the periodic flush could generate events outside of an each iteration @generated[event] = true end end |
#read_next ⇒ Object
190 191 192 193 194 195 196 197 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 190 def read_next @size.times do |t| event = @queue.poll(@wait) return if event.nil? # queue poll timed out @originals[event] = true end end |
#size ⇒ Object
233 234 235 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 233 def size filtered_size end |
#starting_size ⇒ Object
237 238 239 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 237 def starting_size @originals.size end |