Class: Fluent::LighteningBuffer

Inherits:
MemoryBuffer
  • Object
show all
Defined in:
lib/fluent/plugin/buf_lightening.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
# File 'lib/fluent/plugin/buf_lightening.rb', line 24

def configure(conf)
  super
end

#emit(key, data, chain) ⇒ Object

TODO: remove w/ fluentd v0.10.42 (or td-agent including fluentd v0.10.42)



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/buf_lightening.rb', line 39

def emit(key, data, chain) # copy&paste from BasicBuffer, and fix to add hook point
  key = key.to_s

  synchronize do
    top = (@map[key] ||= new_chunk(key))

    if storable?(top, data) # hook point (FIXED THIS LINE ONLY)
      chain.next
      top << data
      return false
    elsif @queue.size >= @buffer_queue_limit
      raise BufferQueueLimitError, "queue size exceeds limit"
    end

    if data.bytesize > @buffer_chunk_limit
      $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
      $log.warn "This may occur problems in the output plugins ``at this server.``"
      $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
      $log.warn "in the forward output ``at the log forwarding server.``"
    end

    nc = new_chunk(key)
    ok = false

    begin
      nc << data
      chain.next

      flush_trigger = false
      @queue.synchronize {
        enqueue(top)
        flush_trigger = @queue.empty?
        @queue << top
        @map[key] = nc
      }

      ok = true
      return flush_trigger
    ensure
      nc.purge unless ok
    end

  end  # synchronize
end

#new_chunk(key) ⇒ Object



28
29
30
# File 'lib/fluent/plugin/buf_lightening.rb', line 28

def new_chunk(key)
  LighteningBufferChunk.new(key)
end

#storable?(chunk, data) ⇒ Boolean

Returns:

  • (Boolean)


32
33
34
35
36
# File 'lib/fluent/plugin/buf_lightening.rb', line 32

def storable?(chunk, data)
  return false if chunk.size + data.bytesize > @buffer_chunk_limit
  return false if @buffer_chunk_records_limit && chunk.record_counter >= @buffer_chunk_records_limit
  true
end