Class: LogStash::Util::WrappedAckedQueue::ReadBatch
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedAckedQueue::ReadBatch
- Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
Instance Method Summary collapse
- #cancel(event) ⇒ Object
- #cancelled_size ⇒ Object
- #close ⇒ Object
- #each(&blk) ⇒ Object
- #filtered_size ⇒ Object
- #flush_signal_received? ⇒ Boolean
-
#initialize(queue, size, wait) ⇒ ReadBatch
constructor
A new instance of ReadBatch.
- #merge(event) ⇒ Object
- #read_next ⇒ Object
- #shutdown_signal_received? ⇒ Boolean
- #size ⇒ Object
- #starting_size ⇒ Object
- #to_a ⇒ Object
Constructor Details
#initialize(queue, size, wait) ⇒ ReadBatch
Returns a new instance of ReadBatch.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 243 def initialize(queue, size, wait) @queue = queue @size = size @wait = wait @originals = Hash.new # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor # @cancelled = Hash.new @generated = Hash.new @iterating_temp = Hash.new @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads @acked_batch = nil end |
Instance Method Details
#cancel(event) ⇒ Object
284 285 286 287 288 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 284 def cancel(event) # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor raise("cancel is unsupported") # @cancelled[event] = true end |
#cancelled_size ⇒ Object
324 325 326 327 328 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 324 def cancelled_size # TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor raise("cancelled_size is unsupported ") # @cancelled.size end |
#close ⇒ Object
265 266 267 268 269 270 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 265 def close # this will ack the whole batch, regardless of whether some # events were cancelled or failed return if @acked_batch.nil? @acked_batch.close end |
#each(&blk) ⇒ Object
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 296 def each(&blk) # take care not to cause @originals or @generated to change during iteration # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor @iterating = true @originals.each do |e, _| blk.call(e) unless e.cancelled? end @generated.each do |e, _| blk.call(e) unless e.cancelled? end @iterating = false update_generated end |
#filtered_size ⇒ Object
320 321 322 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 320 def filtered_size @originals.size + @generated.size end |
#flush_signal_received? ⇒ Boolean
334 335 336 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 334 def flush_signal_received? false end |
#merge(event) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 272 def merge(event) return if event.nil? || @originals.key?(event) # take care not to cause @generated to change during iteration # @iterating_temp is merged after the iteration if @iterating_temp[event] = true else # the periodic flush could generate events outside of an each iteration @generated[event] = true end end |
#read_next ⇒ Object
259 260 261 262 263 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 259 def read_next @acked_batch = @queue.read_batch(@size, @wait) return if @acked_batch.nil? @acked_batch.get_elements.each { |e| @originals[e] = true } end |
#shutdown_signal_received? ⇒ Boolean
330 331 332 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 330 def shutdown_signal_received? false end |
#size ⇒ Object
312 313 314 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 312 def size filtered_size end |
#starting_size ⇒ Object
316 317 318 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 316 def starting_size @originals.size end |
#to_a ⇒ Object
290 291 292 293 294 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 290 def to_a events = [] each {|e| events << e} events end |