Class: Fluent::Plugin::Buffer

Inherits:
Base
  • Object
show all
Includes:
OwnedByMixin, UniqueId::Mixin, MonitorMixin
Defined in:
lib/fluent/plugin/buffer/chunk.rb,
lib/fluent/plugin/buffer.rb,
lib/fluent/plugin/buffer/file_chunk.rb,
lib/fluent/plugin/buffer/memory_chunk.rb

Overview

fluent/plugin/buffer is already loaded

Direct Known Subclasses

FileBuffer, MemoryBuffer

Defined Under Namespace

Classes: BufferChunkOverflowError, BufferError, BufferOverflowError, Chunk, FileChunk, MemoryChunk, Metadata, ShouldRetry

Constant Summary collapse

MINIMUM_APPEND_ATTEMPT_RECORDS =
10
DEFAULT_CHUNK_LIMIT_SIZE =

8MB

8 * 1024 * 1024
DEFAULT_TOTAL_LIMIT_SIZE =

512MB, same with v0.12 (BufferedOutput + buf_memory: 64 x 8MB)

512 * 1024 * 1024
DEFAULT_CHUNK_FULL_THRESHOLD =
0.95

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from OwnedByMixin

#log, #owner, #owner=

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeBuffer

Returns a new instance of Buffer.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/buffer.rb', line 71

def initialize
  super

  @chunk_limit_size = nil
  @total_limit_size = nil
  @queue_limit_length = nil
  @chunk_limit_records = nil

  @stage = {}    #=> Hash (metadata -> chunk) : not flushed yet
  @queue = []    #=> Array (chunks)           : already flushed (not written)
  @dequeued = {} #=> Hash (unique_id -> chunk): already written (not purged)
  @queued_num = {} # metadata => int (number of queued chunks)

  @stage_size = @queue_size = 0
  @metadata_list = [] # keys of @stage
end

Instance Attribute Details

#dequeuedObject (readonly)

Returns the value of attribute dequeued.



69
70
71
# File 'lib/fluent/plugin/buffer.rb', line 69

def dequeued
  @dequeued
end

#queueObject (readonly)

Returns the value of attribute queue.



69
70
71
# File 'lib/fluent/plugin/buffer.rb', line 69

def queue
  @queue
end

#queue_sizeObject

for tests



68
69
70
# File 'lib/fluent/plugin/buffer.rb', line 68

def queue_size
  @queue_size
end

#queued_numObject (readonly)

Returns the value of attribute queued_num.



69
70
71
# File 'lib/fluent/plugin/buffer.rb', line 69

def queued_num
  @queued_num
end

#stageObject (readonly)

Returns the value of attribute stage.



69
70
71
# File 'lib/fluent/plugin/buffer.rb', line 69

def stage
  @stage
end

#stage_sizeObject

for tests



68
69
70
# File 'lib/fluent/plugin/buffer.rb', line 68

def stage_size
  @stage_size
end

Instance Method Details

#add_metadata(metadata) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/buffer.rb', line 174

def ()
  log.trace "adding metadata", instance: self.object_id, metadata: 
  synchronize do
    if i = @metadata_list.index()
      @metadata_list[i]
    else
      @metadata_list << 
      
    end
  end
end

#chunk_size_full?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


439
440
441
# File 'lib/fluent/plugin/buffer.rb', line 439

def chunk_size_full?(chunk)
  chunk.bytesize >= @chunk_limit_size * @chunk_full_threshold || (@chunk_limit_records && chunk.size >= @chunk_limit_records * @chunk_full_threshold)
end

#chunk_size_over?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


435
436
437
# File 'lib/fluent/plugin/buffer.rb', line 435

def chunk_size_over?(chunk)
  chunk.bytesize > @chunk_limit_size || (@chunk_limit_records && chunk.size > @chunk_limit_records)
end

#clear_queue!Object



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/fluent/plugin/buffer.rb', line 418

def clear_queue!
  log.trace "clearing queue", instance: self.object_id
  synchronize do
    until @queue.empty?
      begin
        q = @queue.shift
        log.trace("purging a chunk in queue"){ {id: dump_unique_id_hex(chunk.unique_id), bytesize: chunk.bytesize, size: chunk.size} }
        q.purge
      rescue => e
        log.error "unexpected error while clearing buffer queue", error_class: e.class, error: e
        log.error_backtrace
      end
    end
    @queue_size = 0
  end
end

#closeObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/fluent/plugin/buffer.rb', line 117

def close
  super
  synchronize do
    log.debug "closing buffer", instance: self.object_id
    @dequeued.each_pair do |chunk_id, chunk|
      chunk.close
    end
    until @queue.empty?
      @queue.shift.close
    end
    @stage.each_pair do |, chunk|
      chunk.close
    end
  end
end

#configure(conf) ⇒ Object



92
93
94
95
96
97
98
# File 'lib/fluent/plugin/buffer.rb', line 92

def configure(conf)
  super

  unless @queue_limit_length.nil?
    @total_limit_size = @chunk_limit_size * @queue_limit_length
  end
end

#dequeue_chunkObject



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/fluent/plugin/buffer.rb', line 366

def dequeue_chunk
  return nil if @queue.empty?
  log.trace "dequeueing a chunk", instance: self.object_id
  synchronize do
    chunk = @queue.shift

    # this buffer is dequeued by other thread just before "synchronize" in this thread
    return nil unless chunk

    @dequeued[chunk.unique_id] = chunk
    @queued_num[chunk.] -= 1 # BUG if nil, 0 or subzero
    log.trace "chunk dequeued", instance: self.object_id, metadata: chunk.
    chunk
  end
end

#enqueue_allObject



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/fluent/plugin/buffer.rb', line 349

def enqueue_all
  log.trace "enqueueing all chunks in buffer", instance: self.object_id
  synchronize do
    if block_given?
      @stage.keys.each do ||
        chunk = @stage[]
        v = yield , chunk
        enqueue_chunk() if v
      end
    else
      @stage.keys.each do ||
        enqueue_chunk()
      end
    end
  end
end

#enqueue_chunk(metadata) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/fluent/plugin/buffer.rb', line 314

def enqueue_chunk()
  log.trace "enqueueing chunk", instance: self.object_id, metadata: 
  synchronize do
    chunk = @stage.delete()
    return nil unless chunk

    chunk.synchronize do
      if chunk.empty?
        chunk.close
      else
        @queue << chunk
        @queued_num[] = @queued_num.fetch(, 0) + 1
        chunk.enqueued! if chunk.respond_to?(:enqueued!)
      end
    end
    bytesize = chunk.bytesize
    @stage_size -= bytesize
    @queue_size += bytesize
  end
  nil
end

#enqueue_unstaged_chunk(chunk) ⇒ Object



336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/fluent/plugin/buffer.rb', line 336

def enqueue_unstaged_chunk(chunk)
  log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.
  synchronize do
    chunk.synchronize do
       = chunk.
      @queue << chunk
      @queued_num[] = @queued_num.fetch(, 0) + 1
      chunk.enqueued! if chunk.respond_to?(:enqueued!)
    end
    @queue_size += chunk.bytesize
  end
end

#generate_chunk(metadata) ⇒ Object

Raises:

  • (NotImplementedError)


153
154
155
# File 'lib/fluent/plugin/buffer.rb', line 153

def generate_chunk()
  raise NotImplementedError, "Implement this method in child class"
end

#metadata(timekey: nil, tag: nil, variables: nil) ⇒ Object



186
187
188
189
# File 'lib/fluent/plugin/buffer.rb', line 186

def (timekey: nil, tag: nil, variables: nil)
  meta = (timekey: timekey, tag: tag, variables: variables)
  (meta)
end

#metadata_listObject



157
158
159
160
161
# File 'lib/fluent/plugin/buffer.rb', line 157

def 
  synchronize do
    @metadata_list.dup
  end
end

#metadata_list_clear!Object

it’s too dangerous, and use it so carefully to remove metadata for tests



164
165
166
167
168
# File 'lib/fluent/plugin/buffer.rb', line 164

def 
  synchronize do
    @metadata_list.clear
  end
end

#new_metadata(timekey: nil, tag: nil, variables: nil) ⇒ Object



170
171
172
# File 'lib/fluent/plugin/buffer.rb', line 170

def (timekey: nil, tag: nil, variables: nil)
  Metadata.new(timekey, tag, variables)
end

#persistent?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/fluent/plugin/buffer.rb', line 88

def persistent?
  false
end

#purge_chunk(chunk_id) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/fluent/plugin/buffer.rb', line 394

def purge_chunk(chunk_id)
  synchronize do
    chunk = @dequeued.delete(chunk_id)
    return nil unless chunk # purged by other threads

     = chunk.
    log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: 
    begin
      bytesize = chunk.bytesize
      chunk.purge
      @queue_size -= bytesize
    rescue => e
      log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
      log.error_backtrace
    end

    if  && !@stage[] && (!@queued_num[] || @queued_num[] < 1)
      @metadata_list.delete()
    end
    log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: 
  end
  nil
end

#queued?(metadata = nil) ⇒ Boolean

Returns:

  • (Boolean)


303
304
305
306
307
308
309
310
311
312
# File 'lib/fluent/plugin/buffer.rb', line 303

def queued?(=nil)
  synchronize do
    if 
      n = @queued_num[]
      n && n.nonzero?
    else
      !@queue.empty?
    end
  end
end

#queued_recordsObject



299
300
301
# File 'lib/fluent/plugin/buffer.rb', line 299

def queued_records
  synchronize { @queue.reduce(0){|r, chunk| r + chunk.size } }
end

#resumeObject

TODO: for back pressure feature def used?(ratio)

@total_limit_size * ratio > @stage_size + @queue_size

end

Raises:

  • (NotImplementedError)


148
149
150
151
# File 'lib/fluent/plugin/buffer.rb', line 148

def resume
  # return {}, []
  raise NotImplementedError, "Implement this method in child class"
end

#startObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/buffer.rb', line 100

def start
  super

  @stage, @queue = resume
  @stage.each_pair do |, chunk|
    @metadata_list <<  unless @metadata_list.include?()
    @stage_size += chunk.bytesize
  end
  @queue.each do |chunk|
    @metadata_list << chunk. unless @metadata_list.include?(chunk.)
    @queued_num[chunk.] ||= 0
    @queued_num[chunk.] += 1
    @queue_size += chunk.bytesize
  end
  log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end

#storable?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/fluent/plugin/buffer.rb', line 139

def storable?
  @total_limit_size > @stage_size + @queue_size
end

#takeback_chunk(chunk_id) ⇒ Object



382
383
384
385
386
387
388
389
390
391
392
# File 'lib/fluent/plugin/buffer.rb', line 382

def takeback_chunk(chunk_id)
  log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id)
  synchronize do
    chunk = @dequeued.delete(chunk_id)
    return false unless chunk # already purged by other thread
    @queue.unshift(chunk)
    log.trace "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.
    @queued_num[chunk.] += 1 # BUG if nil
  end
  true
end

#terminateObject



133
134
135
136
137
# File 'lib/fluent/plugin/buffer.rb', line 133

def terminate
  super
  @dequeued = @stage = @queue = @queued_num = @metadata_list = nil
  @stage_size = @queue_size = 0
end

#write(metadata_and_data, format: nil, size: nil, enqueue: false) ⇒ Object

metadata MUST have consistent object_id for each variation data MUST be Array of serialized events, or EventStream metadata_and_data MUST be a hash of { metadata => data }



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/fluent/plugin/buffer.rb', line 194

def write(, format: nil, size: nil, enqueue: false)
  return if .size < 1
  raise BufferOverflowError, "buffer space has too many data" unless storable?

  log.trace "writing events into buffer", instance: self.object_id, metadata_size: .size

  staged_bytesize = 0
  operated_chunks = []
  unstaged_chunks = {} # metadata => [chunk, chunk, ...]
  chunks_to_enqueue = []

  begin
    .each do |, data|
      write_once(, data, format: format, size: size) do |chunk, adding_bytesize|
        chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
        operated_chunks << chunk
        if chunk.staged?
          staged_bytesize += adding_bytesize
        elsif chunk.unstaged?
          unstaged_chunks[] ||= []
          unstaged_chunks[] << chunk
        end
      end
    end

    return if operated_chunks.empty?

    # Now, this thread acquires many locks of chunks... getting buffer-global lock causes dead lock.
    # Any operations needs buffer-global lock (including enqueueing) should be done after releasing locks.

    first_chunk = operated_chunks.shift
    # Following commits for other chunks also can finish successfully if the first commit operation
    # finishes without any exceptions.
    # In most cases, #commit just requires very small disk spaces, so major failure reason are
    # permission errors, disk failures and other permanent(fatal) errors.
    begin
      first_chunk.commit
      if enqueue || first_chunk.unstaged? || chunk_size_full?(first_chunk)
        chunks_to_enqueue << first_chunk
      end
      first_chunk.mon_exit
    rescue
      operated_chunks.unshift(first_chunk)
      raise
    end

    errors = []
    # Buffer plugin estimates there's no serious error cause: will commit for all chunks eigher way
    operated_chunks.each do |chunk|
      begin
        chunk.commit
        if enqueue || chunk.unstaged? || chunk_size_full?(chunk)
          chunks_to_enqueue << chunk
        end
        chunk.mon_exit
      rescue => e
        chunk.rollback
        chunk.mon_exit
        errors << e
      end
    end

    # All locks about chunks are released.

    synchronize do
      # At here, staged chunks may be enqueued by other threads.
      @stage_size += staged_bytesize

      chunks_to_enqueue.each do |c|
        if c.staged? && (enqueue || chunk_size_full?(c))
          m = c.
          enqueue_chunk(m)
          if unstaged_chunks[m]
            u = unstaged_chunks[m].pop
            if u.unstaged? && !chunk_size_full?(u)
              @stage[m] = u.staged!
              @stage_size += u.bytesize
            end
          end
        elsif c.unstaged?
          enqueue_unstaged_chunk(c)
        else
          # previously staged chunk is already enqueued, closed or purged.
          # no problem.
        end
      end
    end

    operated_chunks.clear if errors.empty?

    if errors.size > 0
      log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
      raise errors.first
    end
  ensure
    operated_chunks.each do |chunk|
      chunk.rollback rescue nil # nothing possible to do for #rollback failure
      if chunk.unstaged?
        chunk.purge rescue nil # to prevent leakage of unstaged chunks
      end
      chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
    end
  end
end

#write_once(metadata, data, format: nil, size: nil, &block) ⇒ Object

write once into a chunk

  1. append whole data into existing chunk

  2. commit it & return unless chunk_size_over?

  3. enqueue existing chunk & retry whole method if chunk was not empty

  4. go to step_by_step writing



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/fluent/plugin/buffer.rb', line 451

def write_once(, data, format: nil, size: nil, &block)
  return if data.empty?

  stored = false
  adding_bytesize = nil

  chunk = synchronize { @stage[] ||= generate_chunk().staged! }
  enqueue_chunk_before_retry = false
  chunk.synchronize do
    # retry this method if chunk is already queued (between getting chunk and entering critical section)
    raise ShouldRetry unless chunk.staged?

    empty_chunk = chunk.empty?

    original_bytesize = chunk.bytesize
    begin
      if format
        serialized = format.call(data)
        chunk.concat(serialized, size ? size.call : data.size)
      else
        chunk.append(data, compress: @compress)
      end
      adding_bytesize = chunk.bytesize - original_bytesize

      if chunk_size_over?(chunk)
        if format && empty_chunk
          log.warn "chunk bytes limit exceeds for an emitted event stream: #{adding_bytesize}bytes"
        end
        chunk.rollback

        if format && !empty_chunk
          # Event streams should be appended into a chunk at once
          # as far as possible, to improve performance of formatting.
          # Event stream may be a MessagePackEventStream. We don't want to split it into
          # 2 or more chunks (except for a case that the event stream is larger than chunk limit).
          enqueue_chunk_before_retry = true
          raise ShouldRetry
        end
      else
        stored = true
      end
    rescue
      chunk.rollback
      raise
    end

    if stored
      block.call(chunk, adding_bytesize)
    end
  end

  unless stored
    # try step-by-step appending if data can't be stored into existing a chunk in non-bulk mode
    #
    # 1/10 size of original event stream (splits_count == 10) seems enough small
    # to try emitting events into existing chunk.
    # it does not matter to split event stream into very small splits, because chunks have less
    # overhead to write data many times (even about file buffer chunks).
    write_step_by_step(, data, format, 10, &block)
  end
rescue ShouldRetry
  enqueue_chunk() if enqueue_chunk_before_retry
  retry
end

#write_step_by_step(metadata, data, format, splits_count, &block) ⇒ Object

  1. split event streams into many (10 -> 100 -> 1000 -> …) chunks

  2. append splits into the staged chunks as much as possible

  3. create unstaged chunk and append rest splits -> repeat it for all splits



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/fluent/plugin/buffer.rb', line 523

def write_step_by_step(, data, format, splits_count, &block)
  splits = []
  if splits_count > data.size
    splits_count = data.size
  end
  slice_size = if data.size % splits_count == 0
                 data.size / splits_count
               else
                 data.size / (splits_count - 1)
               end
  slice_origin = 0
  while slice_origin < data.size
    splits << data.slice(slice_origin, slice_size)
    slice_origin += slice_size
  end

  # This method will append events into the staged chunk at first.
  # Then, will generate chunks not staged (not queued) to append rest data.
  staged_chunk_used = false
  modified_chunks = []
  get_next_chunk = ->(){
    c = if staged_chunk_used
          # Staging new chunk here is bad idea:
          # Recovering whole state including newly staged chunks is much harder than current implementation.
          generate_chunk()
        else
          synchronize{ @stage[] ||= generate_chunk().staged! }
        end
    modified_chunks << c
    c
  }

  writing_splits_index = 0
  enqueue_chunk_before_retry = false

  while writing_splits_index < splits.size
    chunk = get_next_chunk.call
    chunk.synchronize do
      raise ShouldRetry unless chunk.writable?
      staged_chunk_used = true if chunk.staged?

      original_bytesize = chunk.bytesize
      begin
        while writing_splits_index < splits.size
          split = splits[writing_splits_index]
          if format
            chunk.concat(format.call(split), split.size)
          else
            chunk.append(split, compress: @compress)
          end

          if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
            chunk.rollback

            if split.size == 1 && original_bytesize == 0
              big_record_size = format ? format.call(split).bytesize : split.first.bytesize
              raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
            end

            if chunk_size_full?(chunk) || split.size == 1
              enqueue_chunk_before_retry = true
            else
              splits_count *= 10
            end

            raise ShouldRetry
          end

          writing_splits_index += 1

          if chunk_size_full?(chunk)
            break
          end
        end
      rescue
        chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
        raise
      end

      block.call(chunk, chunk.bytesize - original_bytesize)
    end
  end
rescue ShouldRetry
  modified_chunks.each do |mc|
    mc.rollback rescue nil
    if mc.unstaged?
      mc.purge rescue nil
    end
  end
  enqueue_chunk() if enqueue_chunk_before_retry
  retry
end