Class: Fluent::ODPSOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ODPSOutput
- Defined in:
- lib/fluent/plugin/out_aliyun_odps.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- @@txt =
nil
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ ODPSOutput
constructor
A new instance of ODPSOutput.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ ODPSOutput
Returns a new instance of ODPSOutput.
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 24 def initialize super require 'time' require_relative 'stream_client' @compressor = nil end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
40 41 42 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 40 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 290 def configure(conf) super print "configure" # You can also refer raw parameter via conf[name]. @tables = [] conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? log.warn "no table definition" else @tables << te end } if @tables.empty? raise ConfigError, "There is no <table>. <table> is required" end end |
#emit(tag, es, chain) ⇒ Object
370 371 372 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 370 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
349 350 351 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 349 def format(tag, time, record) [tag, time, record].to_json + "\n" end |
#format_tag(tag) ⇒ Object
378 379 380 381 382 383 384 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 378 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
340 341 342 343 344 345 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 340 def shutdown super @tables.reject! do |te| te.close() end end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 313 def start super config = { :aliyun_access_id => @aliyun_access_id, :aliyun_access_key => @aliyun_access_key, :project => @project, :aliyun_odps_endpoint => @aliyun_odps_endpoint, :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint, } #init Global setting if (@enable_fast_crc) OdpsDatahub::OdpsConfig::setFastCrc(true) begin OdpsDatahub::CrcCalculator::calculate(StringIO.new("")) rescue => e raise e.to_s end end #初始化各个table object @tables.each { |te| te.init(config) } log.info "the table object size is "+@tables.size.to_s end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
360 361 362 363 364 365 366 367 368 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 360 def write(chunk) #foreach tables,choose table oject ,data = chunk.read @tables.each { |table| if table.pattern.match(chunk.key) log.info "Begin to import the data and the table_match is "+chunk.key return table.import(chunk) end } end |