Class: Fluent::BigObjectOutput_AVRO

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_bigobject_avro.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBigObjectOutput_AVRO

end class



109
110
111
112
113
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 109

def initialize
  super
  require 'avro'
  log.info("bigobject_avro initialize")
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



13
14
15
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 13

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 115

def configure(conf)
  super
  
  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
 
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(log, @bigobject_hostname, @bigobject_port)
    te.configure(e)
    @tables << te
  }
  
#    @tables.each {|t| puts t.to_s}
end

#emit(tag, es, chain) ⇒ Object



173
174
175
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 173

def emit(tag, es, chain)
  super(tag, es, chain, format_tag(tag))
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd.



146
147
148
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 146

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#format_tag(tag) ⇒ Object



165
166
167
168
169
170
171
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 165

def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end

#shutdownObject



141
142
143
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 141

def shutdown
  super
end

#startObject



136
137
138
139
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 136

def start
  super
  log.info("bigobject_avro start")
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/fluent/plugin/out_bigobject_avro.rb', line 153

def write(chunk)
  unknownChunks = []
  @tables.each { |table|
    if table.mpattern.match(chunk.key)
      return table.send_binary(chunk)
    end
  }
  
  log.warn("unknown chunk #{chunk.key}")
    
end