Class: LogStash::Codecs::Fluent

Inherits:
Base
  • Object
show all
Extended by:
PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
Includes:
PluginMixins::EventSupport::EventFactoryAdapter
Defined in:
lib/logstash/codecs/fluent.rb,
lib/logstash/codecs/fluent/event_time.rb

Overview

This codec handles fluentd’s msgpack schema.

For example, you can receive logs from ‘fluent-logger-ruby` with:

source,ruby

input {

tcp {
  codec => fluent {
    nanosecond_precision => true
  }
  port => 4000
}

}

And from your ruby code in your own application:

source,ruby

logger = Fluent::Logger::FluentLogger.new(nil, :host => “example.log”, :port => 4000) logger.post(“some_tag”, { “your” => “data”, “here” => “yay!” })

Notes:

  • to handle EventTime msgpack extension, you must specify nanosecond_precision parameter as true.

Defined Under Namespace

Classes: EventTime

Instance Method Summary collapse

Instance Method Details

#decode(data, &block) ⇒ Object



60
61
62
63
64
# File 'lib/logstash/codecs/fluent.rb', line 60

def decode(data, &block)
  @decoder.feed_each(data) do |item|
    decode_event(item, &block)
  end
end

#encode(event) ⇒ Object

def decode



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/logstash/codecs/fluent.rb', line 66

def encode(event)
  # Ensure tag to "tag1.tag2.tag3" style string.
  # Fluentd cannot handle Array class value in forward protocol's tag.
  tag = forwardable_tag(event)
  epochtime = if @nanosecond_precision
                EventTime.new(event.timestamp.to_i, event.timestamp.usec * 1000)
              else
                event.timestamp.to_i
              end

  # use normalize to make sure returned Hash is pure Ruby for
  # MessagePack#pack which relies on pure Ruby object recognition
  data = LogStash::Util.normalize(event.to_hash)

  @packer.clear
  @on_event.call(event, @packer.pack([tag, epochtime, normalize_timestamps(data)]))
end

#forwardable_tag(event) ⇒ Object

def encode



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/logstash/codecs/fluent.rb', line 84

def forwardable_tag(event)
  tag = event.get("tags") || "log"
  case tag
  when Array
    tag.join('.')
  when String
    tag
  else
    tag.to_s
  end
end

#registerObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/logstash/codecs/fluent.rb', line 49

def register
  require "msgpack"
  @factory = MessagePack::Factory.new
  if @nanosecond_precision
    @factory.register_type(EventTime::TYPE, EventTime)
  end
  @packer = @factory.packer
  @decoder = @factory.unpacker
  @packforward_decoder = nil
end