Class: Fluent::LighteningBuffer
- Inherits:
-
MemoryBuffer
- Object
- MemoryBuffer
- Fluent::LighteningBuffer
- Defined in:
- lib/fluent/plugin/buf_lightening.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#emit(key, data, chain) ⇒ Object
TODO: remove w/ fluentd v0.10.42 (or td-agent including fluentd v0.10.42).
- #new_chunk(key) ⇒ Object
- #storable?(chunk, data) ⇒ Boolean
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 |
# File 'lib/fluent/plugin/buf_lightening.rb', line 24 def configure(conf) super end |
#emit(key, data, chain) ⇒ Object
TODO: remove w/ fluentd v0.10.42 (or td-agent including fluentd v0.10.42)
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/buf_lightening.rb', line 39 def emit(key, data, chain) # copy&paste from BasicBuffer, and fix to add hook point key = key.to_s synchronize do top = (@map[key] ||= new_chunk(key)) if storable?(top, data) # hook point (FIXED THIS LINE ONLY) chain.next top << data return false elsif @queue.size >= @buffer_queue_limit raise BufferQueueLimitError, "queue size exceeds limit" end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``" $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" $log.warn "in the forward output ``at the log forwarding server.``" end nc = new_chunk(key) ok = false begin nc << data chain.next flush_trigger = false @queue.synchronize { enqueue(top) flush_trigger = @queue.empty? @queue << top @map[key] = nc } ok = true return flush_trigger ensure nc.purge unless ok end end # synchronize end |
#new_chunk(key) ⇒ Object
28 29 30 |
# File 'lib/fluent/plugin/buf_lightening.rb', line 28 def new_chunk(key) LighteningBufferChunk.new(key) end |
#storable?(chunk, data) ⇒ Boolean
32 33 34 35 36 |
# File 'lib/fluent/plugin/buf_lightening.rb', line 32 def storable?(chunk, data) return false if chunk.size + data.bytesize > @buffer_chunk_limit return false if @buffer_chunk_records_limit && chunk.record_counter >= @buffer_chunk_records_limit true end |