Class: Fluent::BigObjectOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_bigobject.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBigObjectOutput

end class



125
126
127
128
129
130
# File 'lib/fluent/plugin/out_bigobject.rb', line 125

def initialize
  super
  require 'rest-client'
  require 'json'
  log.info("bigobject initialize")
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



11
12
13
# File 'lib/fluent/plugin/out_bigobject.rb', line 11

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin/out_bigobject.rb', line 132

def configure(conf)
  super
  
  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(log)
    te.configure(e)
#      puts "conf.elements #{e}"
    @tables << te
  }
  
#    @tables.each {|t| puts t.to_s}
end

#emit(tag, es, chain) ⇒ Object



196
197
198
# File 'lib/fluent/plugin/out_bigobject.rb', line 196

def emit(tag, es, chain)
  super(tag, es, chain, format_tag(tag))
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd.



165
166
167
168
# File 'lib/fluent/plugin/out_bigobject.rb', line 165

def format(tag, time, record)
#    puts "tag=#{tag}, time=#{time}, record=#{record}"
  [tag, time, record].to_msgpack
end

#format_tag(tag) ⇒ Object



188
189
190
191
192
193
194
# File 'lib/fluent/plugin/out_bigobject.rb', line 188

def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end

#shutdownObject



158
159
160
161
# File 'lib/fluent/plugin/out_bigobject.rb', line 158

def shutdown
  super
  log.info("bigobject shutdown")
end

#startObject



153
154
155
156
# File 'lib/fluent/plugin/out_bigobject.rb', line 153

def start
  super
  log.info("bigobject start")
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin/out_bigobject.rb', line 173

def write(chunk)
  unknownChunks = []
  @tables.each { |table|
#      puts "write table #{table}"
#      puts "chunk.key= #{chunk.key}"
    if table.mpattern.match(chunk.key)
      log.info("add known chunk #{chunk.key}")
      return table.send(@bigobject_url, chunk)
    end
  }
  
  log.warn("unknown chunk #{chunk.key}")
    
end