Class: Fluent::BigObjectOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::BigObjectOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_bigobject.rb
Defined Under Namespace
Classes: TableElement
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ BigObjectOutput
constructor
end class.
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ BigObjectOutput
end class
125 126 127 128 129 130 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 125 def initialize super require 'rest-client' require 'json' log.info("bigobject initialize") end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
11 12 13 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 11 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 132 def configure(conf) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(log) te.configure(e) # puts "conf.elements #{e}" @tables << te } # @tables.each {|t| puts t.to_s} end |
#emit(tag, es, chain) ⇒ Object
196 197 198 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 196 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
165 166 167 168 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 165 def format(tag, time, record) # puts "tag=#{tag}, time=#{time}, record=#{record}" [tag, time, record].to_msgpack end |
#format_tag(tag) ⇒ Object
188 189 190 191 192 193 194 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 188 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
158 159 160 161 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 158 def shutdown super log.info("bigobject shutdown") end |
#start ⇒ Object
153 154 155 156 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 153 def start super log.info("bigobject start") end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/fluent/plugin/out_bigobject.rb', line 173 def write(chunk) unknownChunks = [] @tables.each { |table| # puts "write table #{table}" # puts "chunk.key= #{chunk.key}" if table.mpattern.match(chunk.key) log.info("add known chunk #{chunk.key}") return table.send(@bigobject_url, chunk) end } log.warn("unknown chunk #{chunk.key}") end |