Class: Fluent::DetachProcessManager::DelayedForwarder
- Inherits:
-
Object
- Object
- Fluent::DetachProcessManager::DelayedForwarder
- Defined in:
- lib/fluent/process.rb
Instance Method Summary collapse
- #emit(tag, es) ⇒ Object
-
#initialize(w, interval) ⇒ DelayedForwarder
constructor
A new instance of DelayedForwarder.
- #run ⇒ Object
Constructor Details
#initialize(w, interval) ⇒ DelayedForwarder
Returns a new instance of DelayedForwarder.
242 243 244 245 246 247 248 |
# File 'lib/fluent/process.rb', line 242 def initialize(w, interval) @w = w @interval = interval @buffer = {} @mutex = Mutex.new Thread.new(&method(:run)) end |
Instance Method Details
#emit(tag, es) ⇒ Object
250 251 252 253 254 255 256 257 258 259 |
# File 'lib/fluent/process.rb', line 250 def emit(tag, es) stream = es.to_msgpack_stream @mutex.synchronize do if @buffer[tag] @buffer[tag] << stream else @buffer[tag] = stream end end end |
#run ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/fluent/process.rb', line 261 def run while true sleep @interval pairs = [] @mutex.synchronize do @buffer.keys.each do |tag| if ms = @buffer.delete(tag) pairs << [tag, ms] end end end pairs.each do |pair| pair.to_msgpack(@w) end end rescue $log.error "error on forwerder thread", error: $!.to_s $log.error_backtrace raise end |