Class: Fluent::DetachProcessManager::DelayedForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/process.rb

Instance Method Summary collapse

Constructor Details

#initialize(w, interval) ⇒ DelayedForwarder

Returns a new instance of DelayedForwarder.



237
238
239
240
241
242
243
# File 'lib/fluent/process.rb', line 237

def initialize(w, interval)
  @w = w
  @interval = interval
  @buffer = {}
  @mutex = Mutex.new
  Thread.new(&method(:run))
end

Instance Method Details

#emit(tag, es) ⇒ Object



245
246
247
248
249
250
251
252
253
254
# File 'lib/fluent/process.rb', line 245

def emit(tag, es)
  stream = es.to_msgpack_stream
  @mutex.synchronize do
    if @buffer[tag]
      @buffer[tag] << stream
    else
      @buffer[tag] = stream
    end
  end
end

#runObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/fluent/process.rb', line 256

def run
  while true
    sleep @interval

    pairs = []
    @mutex.synchronize do
      @buffer.keys.each do |tag|
        if ms = @buffer.delete(tag)
          pairs << [tag, ms]
        end
      end
    end
    pairs.each do |pair|
      pair.to_msgpack(@w)
    end
  end
rescue
  $log.error "error on forwerder thread", error: $!.to_s
  $log.error_backtrace
  raise
end