Class: LogStash::Codecs::Fluent
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Fluent
- Extended by:
- PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::EventSupport::EventFactoryAdapter
- Defined in:
- lib/logstash/codecs/fluent.rb,
lib/logstash/codecs/fluent/event_time.rb
Overview
This codec handles fluentd’s msgpack schema.
For example, you can receive logs from ‘fluent-logger-ruby` with:
- source,ruby
-
input {
tcp { codec => fluent { nanosecond_precision => true } port => 4000 }
}
And from your ruby code in your own application:
- source,ruby
-
logger = Fluent::Logger::FluentLogger.new(nil, :host => “example.log”, :port => 4000) logger.post(“some_tag”, { “your” => “data”, “here” => “yay!” })
Notes:
-
to handle EventTime msgpack extension, you must specify nanosecond_precision parameter as true.
Defined Under Namespace
Classes: EventTime
Instance Method Summary collapse
- #decode(data, &block) ⇒ Object
-
#encode(event) ⇒ Object
def decode.
-
#forwardable_tag(event) ⇒ Object
def encode.
- #register ⇒ Object
Instance Method Details
#decode(data, &block) ⇒ Object
60 61 62 63 64 |
# File 'lib/logstash/codecs/fluent.rb', line 60 def decode(data, &block) @decoder.feed_each(data) do |item| decode_event(item, &block) end end |
#encode(event) ⇒ Object
def decode
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/logstash/codecs/fluent.rb', line 66 def encode(event) # Ensure tag to "tag1.tag2.tag3" style string. # Fluentd cannot handle Array class value in forward protocol's tag. tag = forwardable_tag(event) epochtime = if @nanosecond_precision EventTime.new(event..to_i, event..usec * 1000) else event..to_i end # use normalize to make sure returned Hash is pure Ruby for # MessagePack#pack which relies on pure Ruby object recognition data = LogStash::Util.normalize(event.to_hash) @packer.clear @on_event.call(event, @packer.pack([tag, epochtime, (data)])) end |
#forwardable_tag(event) ⇒ Object
def encode
84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/logstash/codecs/fluent.rb', line 84 def forwardable_tag(event) tag = event.get("tags") || "log" case tag when Array tag.join('.') when String tag else tag.to_s end end |
#register ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/logstash/codecs/fluent.rb', line 49 def register require "msgpack" @factory = MessagePack::Factory.new if @nanosecond_precision @factory.register_type(EventTime::TYPE, EventTime) end @packer = @factory.packer @decoder = @factory.unpacker @packforward_decoder = nil end |