Method: Fluent::Plugin::Buffer#enqueue_all

Defined in:
lib/fluent/plugin/buffer.rb

#enqueue_all(force_enqueue = false) ⇒ Object

At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it.



535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
# File 'lib/fluent/plugin/buffer.rb', line 535

def enqueue_all(force_enqueue = false)
  log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }
  update_timekeys if @enable_update_timekeys

  if block_given?
    synchronize{ @stage.keys }.each do ||
      return if !force_enqueue && queue_full?
      # NOTE: The following line might cause data race depending on Ruby implementations except CRuby
      # cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251
      chunk = @stage[]
      next unless chunk
      v = yield , chunk
      enqueue_chunk() if v
    end
  else
    synchronize{ @stage.keys }.each do ||
      return if !force_enqueue && queue_full?
      enqueue_chunk()
    end
  end
end