Class: LogStash::Codecs::Fluent
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Fluent
- Defined in:
- lib/logstash/codecs/fluent.rb
Overview
This codec handles fluentd’s msgpack schema.
For example, you can receive logs from ‘fluent-logger-ruby` with:
- source,ruby
-
input {
tcp { codec => fluent port => 4000 }
}
And from your ruby code in your own application:
- source,ruby
-
logger = Fluent::Logger::FluentLogger.new(nil, :host => “example.log”, :port => 4000) logger.post(“some_tag”, { “your” => “data”, “here” => “yay!” })
Notes:
-
the fluent uses a second-precision time for events, so you will never see subsecond precision on events processed by this codec.
Instance Method Summary collapse
Instance Method Details
#decode(data) ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/logstash/codecs/fluent.rb', line 38 def decode(data) @decoder.feed(data) @decoder.each do |tag, epochtime, map| event = LogStash::Event.new(map.merge( LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), "tags" => tag )) yield event end end |
#encode(event) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/logstash/codecs/fluent.rb', line 50 def encode(event) tag = event.get("tags") || "log" epochtime = event..to_i # use normalize to make sure returned Hash is pure Ruby for # MessagePack#pack which relies on pure Ruby object recognition data = LogStash::Util.normalize(event.to_hash) # timestamp is serialized as a iso8601 string # merge to avoid modifying data which could have side effects if multiple outputs @on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event..to_iso8601)])) end |
#register ⇒ Object
32 33 34 35 |
# File 'lib/logstash/codecs/fluent.rb', line 32 def register require "msgpack" @decoder = MessagePack::Unpacker.new end |