Class: Fluent::ODPSOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ODPSOutput
- Defined in:
- lib/fluent/plugin/out_aliyun_odps.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- @@txt =
nil
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ ODPSOutput
constructor
A new instance of ODPSOutput.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ ODPSOutput
Returns a new instance of ODPSOutput.
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 24 def initialize super require 'time' require_relative 'stream_client' @compressor = nil end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
40 41 42 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 40 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 281 def configure(conf) super print "configure" # You can also refer raw parameter via conf[name]. @tables = [] conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? log.warn "no table definition" else @tables << te end } if @tables.empty? raise ConfigError, "There is no <table>. <table> is required" end end |
#emit(tag, es, chain) ⇒ Object
361 362 363 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 361 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
340 341 342 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 340 def format(tag, time, record) [tag, time, record].to_json + "\n" end |
#format_tag(tag) ⇒ Object
369 370 371 372 373 374 375 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 369 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
331 332 333 334 335 336 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 331 def shutdown super @tables.reject! do |te| te.close() end end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 304 def start super config = { :aliyun_access_id => @aliyun_access_id, :aliyun_access_key => @aliyun_access_key, :project => @project, :aliyun_odps_endpoint => @aliyun_odps_endpoint, :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint, } #init Global setting if (@enable_fast_crc) OdpsDatahub::OdpsConfig::setFastCrc(true) begin OdpsDatahub::CrcCalculator::calculate(StringIO.new("")) rescue => e raise e.to_s end end #初始化各个table object @tables.each { |te| te.init(config) } log.info "the table object size is "+@tables.size.to_s end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
351 352 353 354 355 356 357 358 359 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 351 def write(chunk) #foreach tables,choose table oject ,data = chunk.read @tables.each { |table| if table.pattern.match(chunk.key) log.info "Begin to import the data and the table_match is "+chunk.key return table.import(chunk) end } end |