Class: Fluent::BigObjectOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_bigobject.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBigObjectOutput

end class



189
190
191
192
193
194
195
# File 'lib/fluent/plugin/out_bigobject.rb', line 189

def initialize
  super
  require 'rest-client'
  require 'json'
  require 'avro'
  log.info("bigobject initialize")
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



13
14
15
# File 'lib/fluent/plugin/out_bigobject.rb', line 13

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/fluent/plugin/out_bigobject.rb', line 197

def configure(conf)
  super
  
  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
 
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(log, @bigobject_hostname, @bigobject_port)
    te.configure(e)
    @tables << te
  }
  
#    @tables.each {|t| puts t.to_s}
end

#emit(tag, es, chain) ⇒ Object



255
256
257
# File 'lib/fluent/plugin/out_bigobject.rb', line 255

def emit(tag, es, chain)
  super(tag, es, chain, format_tag(tag))
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd.



228
229
230
# File 'lib/fluent/plugin/out_bigobject.rb', line 228

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#format_tag(tag) ⇒ Object



247
248
249
250
251
252
253
# File 'lib/fluent/plugin/out_bigobject.rb', line 247

def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end

#shutdownObject



223
224
225
# File 'lib/fluent/plugin/out_bigobject.rb', line 223

def shutdown
  super
end

#startObject



218
219
220
221
# File 'lib/fluent/plugin/out_bigobject.rb', line 218

def start
  super
  log.info("bigobject start")
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.



235
236
237
238
239
240
241
242
243
244
245
# File 'lib/fluent/plugin/out_bigobject.rb', line 235

def write(chunk)
  unknownChunks = []
  @tables.each { |table|
    if table.mpattern.match(chunk.key)
      return table.send(chunk)
    end
  }
  
  log.warn("unknown chunk #{chunk.key}")
    
end