Class: Fluent::FlumeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_flume.rb

Instance Method Summary collapse

Constructor Details

#initializeFlumeOutput

Returns a new instance of FlumeOutput.



33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/out_flume.rb', line 33

def initialize
  require 'thrift'
  $:.unshift File.join(File.dirname(__FILE__), 'thrift')
  require 'flume_types'
  require 'flume_constants'
  require 'thrift_source_protocol'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



42
43
44
45
46
# File 'lib/fluent/plugin/out_flume.rb', line 42

def configure(conf)
  # override default buffer_chunk_limit
  conf['buffer_chunk_limit'] ||= '1m'
  super
end

#format(tag, time, record) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_flume.rb', line 61

def format(tag, time, record)
  if @remove_prefix and
      ( (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
        tag == @remove_prefix)
    [(tag[@removed_length..-1] || @default_category), time, record].to_msgpack
  else
    [tag, time, record].to_msgpack
  end
end

#shutdownObject



57
58
59
# File 'lib/fluent/plugin/out_flume.rb', line 57

def shutdown
  super
end

#startObject



48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/out_flume.rb', line 48

def start
  super

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
end

#write(chunk) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_flume.rb', line 71

def write(chunk)
  socket = Thrift::Socket.new @host, @port, @timeout
  transport = Thrift::FramedTransport.new socket
  #protocol = Thrift::BinaryProtocol.new transport, false, false
  protocol = Thrift::CompactProtocol.new transport
  client = ThriftSourceProtocol::Client.new protocol

  count = 0
  transport.open
  log.debug "thrift client opened: #{client}"
  begin
    chunk.msgpack_each { |tag, time, record|
      entry = ThriftFlumeEvent.new(:body      => record.to_json.to_s.force_encoding('UTF-8'),
                                   :headers   => {
                                     'timestamp' => time.to_s,
                                     'tag'       => tag,
                                   })
      client.append entry
      count += 1
    }
    log.debug "Writing #{count} entries to flume"
  ensure
    log.debug "thrift client closing: #{client}"
    transport.close
  end
end