Class: Fluent::BigObjectOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_bigobject.rb

Defined Under Namespace

Classes: TableElement

Constant Summary collapse

DEFAULT_TAG_FORMAT =

DEFAULT_TAG_FORMAT = /(?<table_name>).(?<event>).(?<primary_key>+)$/

/^(?<table_name>[^\.]+)\.(?<event>[^\.]+)(\.(?<primary_key>[^\.]+))*/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBigObjectOutput

end class



177
178
179
180
181
182
# File 'lib/fluent/plugin/out_bigobject.rb', line 177

def initialize
  super
  require 'rest-client'
  require 'json'
  log.info("bigobject initialize")
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



16
17
18
# File 'lib/fluent/plugin/out_bigobject.rb', line 16

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/out_bigobject.rb', line 184

def configure(conf)
  super
  
  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT
    @tag_format = DEFAULT_TAG_FORMAT
  else
    @tag_format = Regexp.new(conf['tag_format'])
  end
  
  @tables = []
  @default_table = nil
 
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(log, @bigobject_hostname, @bigobject_port, @tag_format)
    te.configure(e)
    @tables << te
  }
  
#    @tables.each {|t| puts t.to_s}
end

#emit(tag, es, chain) ⇒ Object



252
253
254
255
# File 'lib/fluent/plugin/out_bigobject.rb', line 252

def emit(tag, es, chain)
  nt = format_tag(tag)
  super(nt, es, chain, nt)
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd.



221
222
223
# File 'lib/fluent/plugin/out_bigobject.rb', line 221

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#format_tag(tag) ⇒ Object



244
245
246
247
248
249
250
# File 'lib/fluent/plugin/out_bigobject.rb', line 244

def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end

#shutdownObject



216
217
218
# File 'lib/fluent/plugin/out_bigobject.rb', line 216

def shutdown
  super
end

#startObject



211
212
213
214
# File 'lib/fluent/plugin/out_bigobject.rb', line 211

def start
  super
  log.info("bigobject start")
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/fluent/plugin/out_bigobject.rb', line 228

def write(chunk)
  unknownChunks = []
  tag = chunk.key
  tag_parts = tag.match(@tag_format)
  target_table = tag_parts['table_name']
    
  @tables.each { |table|
    if table.mpattern.match(target_table)
      return table.send(chunk)
    end
  }
  
  log.warn("unknown chunk #{chunk.key}")
    
end