Class: Fluent::KestrelOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_kestrel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeKestrelOutput

Returns a new instance of KestrelOutput.



17
18
19
20
21
# File 'lib/fluent/plugin/out_kestrel.rb', line 17

def initialize
  super
  require 'kestrel'
  require 'time'
end

Instance Attribute Details

#kestrelObject (readonly)

Returns the value of attribute kestrel.



4
5
6
# File 'lib/fluent/plugin/out_kestrel.rb', line 4

def kestrel
  @kestrel
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_kestrel.rb', line 23

def configure(conf)
  super

  unless @queue && @host
    raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required."
  end
  @timef = TimeFormatter.new(@time_format, @localtime)
  @f_separator = case conf['field_separator']
                 when 'SPACE' then ' '
                 when 'COMMA' then ','
                 else "\t"
                 end

  if @remove_prefix
    @remove_prefix_string = @remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

end

#format(tag, time, record) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/out_kestrel.rb', line 53

def format(tag, time, record)

  if tag == @remove_prefix or @remove_prefix and (tag[0, @remove_prefix_length] == @remove_prefix_string and tag.length > @remove_prefix_length)
    tag = tag[@remove_prefix_length..-1]
  end

  time_str = if @output_include_time
               @timef.format(time) + @f_separator
             else
               ''
             end
  tag_str = if @output_include_tag
              tag + @f_separator
            else
              ''
            end
  [tag_str, time_str, record].to_msgpack
end

#shutdownObject



49
50
51
# File 'lib/fluent/plugin/out_kestrel.rb', line 49

def shutdown
  super
end

#startObject



43
44
45
46
47
# File 'lib/fluent/plugin/out_kestrel.rb', line 43

def start
  super

  @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s)
end

#write(chunk) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/out_kestrel.rb', line 72

def write(chunk)
  chunk.open { |io|
    begin
      MessagePack::Unpacker.new(io).each{ |tag, time, record|
        data = "#{time}#{tag}#{record.to_json}"

        @kestrel.set(@queue, data, ttl=@ttl, raw=@raw)
      }
    rescue EOFError
      # EOFError always occured when reached end of chunk.
    end
  }
end