Class: LogStash::Codecs::Fluent

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/fluent.rb

Overview

This codec handles fluentd’s msgpack schema.

For example, you can receive logs from ‘fluent-logger-ruby` with:

source,ruby

input {

tcp {
  codec => fluent
  port => 4000
}

}

And from your ruby code in your own application:

source,ruby

logger = Fluent::Logger::FluentLogger.new(nil, :host => “example.log”, :port => 4000) logger.post(“some_tag”, { “your” => “data”, “here” => “yay!” })

Notes:

  • the fluent uses a second-precision time for events, so you will never see subsecond precision on events processed by this codec.

Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'lib/logstash/codecs/fluent.rb', line 38

def decode(data)
  @decoder.feed(data)
  @decoder.each do |tag, epochtime, map|
    event = LogStash::Event.new(map.merge(
      LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
      "tags" => tag
    ))
    yield event
  end
end

#encode(event) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/logstash/codecs/fluent.rb', line 50

def encode(event)
  tag = event.get("tags") || "log"
  epochtime = event.timestamp.to_i

  # use normalize to make sure returned Hash is pure Ruby for
  # MessagePack#pack which relies on pure Ruby object recognition
  data = LogStash::Util.normalize(event.to_hash)
  # timestamp is serialized as a iso8601 string
  # merge to avoid modifying data which could have side effects if multiple outputs
  @on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
end

#registerObject



32
33
34
35
# File 'lib/logstash/codecs/fluent.rb', line 32

def register
  require "msgpack"
  @decoder = MessagePack::Unpacker.new
end