Class: LogStash::Util::WrappedAckedQueue::ReadBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, size, wait) ⇒ ReadBatch

Returns a new instance of ReadBatch.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 243

def initialize(queue, size, wait)
  @queue = queue
  @size = size
  @wait = wait

  @originals = Hash.new

  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
  # @cancelled = Hash.new

  @generated = Hash.new
  @iterating_temp = Hash.new
  @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
  @acked_batch = nil
end

Instance Method Details

#cancel(event) ⇒ Object



284
285
286
287
288
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 284

def cancel(event)
  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
  raise("cancel is unsupported")
  # @cancelled[event] = true
end

#cancelled_sizeObject



324
325
326
327
328
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 324

def cancelled_size
  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
  raise("cancelled_size is unsupported ")
  # @cancelled.size
end

#closeObject



265
266
267
268
269
270
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 265

def close
  # this will ack the whole batch, regardless of whether some
  # events were cancelled or failed
  return if @acked_batch.nil?
  @acked_batch.close
end

#each(&blk) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 296

def each(&blk)
  # take care not to cause @originals or @generated to change during iteration

  # below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
  # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
  @iterating = true
  @originals.each do |e, _|
    blk.call(e) unless e.cancelled?
  end
  @generated.each do |e, _|
    blk.call(e) unless e.cancelled?
  end
  @iterating = false
  update_generated
end

#filtered_sizeObject



320
321
322
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 320

def filtered_size
  @originals.size + @generated.size
end

#flush_signal_received?Boolean

Returns:

  • (Boolean)


334
335
336
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 334

def flush_signal_received?
  false
end

#merge(event) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 272

def merge(event)
  return if event.nil? || @originals.key?(event)
  # take care not to cause @generated to change during iteration
  # @iterating_temp is merged after the iteration
  if iterating?
    @iterating_temp[event] = true
  else
    # the periodic flush could generate events outside of an each iteration
    @generated[event] = true
  end
end

#read_nextObject



259
260
261
262
263
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 259

def read_next
  @acked_batch = @queue.read_batch(@size, @wait)
  return if @acked_batch.nil?
  @acked_batch.get_elements.each { |e| @originals[e] = true }
end

#shutdown_signal_received?Boolean

Returns:

  • (Boolean)


330
331
332
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 330

def shutdown_signal_received?
  false
end

#sizeObject



312
313
314
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 312

def size
  filtered_size
end

#starting_sizeObject



316
317
318
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 316

def starting_size
  @originals.size
end

#to_aObject



290
291
292
293
294
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 290

def to_a
  events = []
  each {|e| events << e}
  events
end