Class: Fluent::DetachProcessManager::DelayedForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/process.rb

Instance Method Summary collapse

Constructor Details

#initialize(w, interval) ⇒ DelayedForwarder

Returns a new instance of DelayedForwarder.



242
243
244
245
246
247
248
# File 'lib/fluent/process.rb', line 242

def initialize(w, interval)
  @w = w
  @interval = interval
  @buffer = {}
  @mutex = Mutex.new
  Thread.new(&method(:run))
end

Instance Method Details

#emit(tag, es) ⇒ Object



250
251
252
253
254
255
256
257
258
259
# File 'lib/fluent/process.rb', line 250

def emit(tag, es)
  stream = es.to_msgpack_stream
  @mutex.synchronize do
    if @buffer[tag]
      @buffer[tag] << stream
    else
      @buffer[tag] = stream
    end
  end
end

#runObject



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/fluent/process.rb', line 261

def run
  while true
    sleep @interval

    pairs = []
    @mutex.synchronize do
      @buffer.keys.each do |tag|
        if ms = @buffer.delete(tag)
          pairs << [tag, ms]
        end
      end
    end
    pairs.each do |pair|
      pair.to_msgpack(@w)
    end
  end
rescue
  $log.error "error on forwerder thread", error: $!.to_s
  $log.error_backtrace
  raise
end