Class: Fluent::KestrelOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::KestrelOutput
- Defined in:
- lib/fluent/plugin/out_kestrel.rb
Instance Attribute Summary collapse
-
#kestrel ⇒ Object
readonly
Returns the value of attribute kestrel.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ KestrelOutput
constructor
A new instance of KestrelOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ KestrelOutput
Returns a new instance of KestrelOutput.
17 18 19 20 21 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 17 def initialize super require 'kestrel' require 'time' end |
Instance Attribute Details
#kestrel ⇒ Object (readonly)
Returns the value of attribute kestrel.
4 5 6 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 4 def kestrel @kestrel end |
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 23 def configure(conf) super unless @queue && @host raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required." end @timef = TimeFormatter.new(@time_format, @localtime) @f_separator = case conf['field_separator'] when 'SPACE' then ' ' when 'COMMA' then ',' else "\t" end if @remove_prefix @remove_prefix_string = @remove_prefix + '.' @remove_prefix_length = @remove_prefix_string.length end end |
#format(tag, time, record) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 53 def format(tag, time, record) if tag == @remove_prefix or @remove_prefix and (tag[0, @remove_prefix_length] == @remove_prefix_string and tag.length > @remove_prefix_length) tag = tag[@remove_prefix_length..-1] end time_str = if @output_include_time @timef.format(time) + @f_separator else '' end tag_str = if @output_include_tag tag + @f_separator else '' end [tag_str, time_str, record].to_msgpack end |
#shutdown ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 49 def shutdown super end |
#start ⇒ Object
43 44 45 46 47 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 43 def start super @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s) end |
#write(chunk) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/out_kestrel.rb', line 72 def write(chunk) chunk.open { |io| begin MessagePack::Unpacker.new(io).each{ |tag, time, record| data = "#{time}#{tag}#{record.to_json}" @kestrel.set(@queue, data, ttl=@ttl, raw=@raw) } rescue EOFError # EOFError always occured when reached end of chunk. end } end |