Class: Fluent::FlumeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_flume.rb

Instance Method Summary collapse

Constructor Details

#initializeFlumeOutput

Returns a new instance of FlumeOutput.



39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/out_flume.rb', line 39

def initialize
  require 'thrift'
  $:.unshift File.join(File.dirname(__FILE__), 'thrift')
  require 'flume_types'
  require 'flume_constants'
  require 'thrift_source_protocol'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_flume.rb', line 48

def configure(conf)
  # override default buffer_chunk_limit
  conf['buffer_chunk_limit'] ||= '1m'

  super

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
end

#format(tag, time, record) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_flume.rb', line 71

def format(tag, time, record)
  if @remove_prefix and
      ((tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @remove_prefix)
    tag = (tag[@removed_length..-1] || @default_category)
  end
  fr = @formatter.format(tag, time, record)
  fr.chomp! if @trim_nl
  [tag, time, fr].to_msgpack
end

#shutdownObject



67
68
69
# File 'lib/fluent/plugin/out_flume.rb', line 67

def shutdown
  super
end

#startObject



58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_flume.rb', line 58

def start
  super

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
end

#write(chunk) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/out_flume.rb', line 81

def write(chunk)
  socket = Thrift::Socket.new @host, @port, @timeout
  transport = Thrift::FramedTransport.new socket
  #protocol = Thrift::BinaryProtocol.new transport, false, false
  protocol = Thrift::CompactProtocol.new transport
  client = ThriftSourceProtocol::Client.new protocol

  count = 0
  header = {}
  transport.open
  log.debug "thrift client opened: #{client}"
  begin
    chunk.msgpack_each { |tag, time, record|
      header['timestamp'.freeze] = time.to_s
      header['tag'.freeze] = tag
      entry = ThriftFlumeEvent.new(:body    => record.force_encoding('UTF-8'),
                                   :headers => header)
      client.append entry
      count += 1
    }
    log.debug "Writing #{count} entries to flume"
  ensure
    log.debug "thrift client closing: #{client}"
    transport.close
  end
end