Class: Fluent::BigObjectOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::BigObjectOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_bigobject.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- DEFAULT_TAG_FORMAT =
DEFAULT_TAG_FORMAT = /(?<table_name>).(?<event>).(?<primary_key>+)$/
/^(?<table_name>[^\.]+)\.(?<event>[^\.]+)(\.(?<primary_key>[^\.]+))*/
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ BigObjectOutput
constructor
end class.
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ BigObjectOutput
end class
177 178 179 180 181 182 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 177 def initialize super require 'rest-client' require 'json' log.info("bigobject initialize") end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
16 17 18 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 16 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 184 def configure(conf) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT @tag_format = DEFAULT_TAG_FORMAT else @tag_format = Regexp.new(conf['tag_format']) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(log, @bigobject_hostname, @bigobject_port, @tag_format) te.configure(e) @tables << te } # @tables.each {|t| puts t.to_s} end |
#emit(tag, es, chain) ⇒ Object
252 253 254 255 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 252 def emit(tag, es, chain) nt = format_tag(tag) super(nt, es, chain, nt) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
221 222 223 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 221 def format(tag, time, record) [tag, time, record].to_msgpack end |
#format_tag(tag) ⇒ Object
244 245 246 247 248 249 250 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 244 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
216 217 218 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 216 def shutdown super end |
#start ⇒ Object
211 212 213 214 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 211 def start super log.info("bigobject start") end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 228 def write(chunk) unknownChunks = [] tag = chunk.key tag_parts = tag.match(@tag_format) target_table = tag_parts['table_name'] @tables.each { |table| if table.mpattern.match(target_table) return table.send(chunk) end } log.warn("unknown chunk #{chunk.key}") end |