Class: Fluent::ODPSOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ODPSOutput
- Defined in:
- lib/fluent/plugin/out_aliyun_odps.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- @@txt =
nil
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ ODPSOutput
constructor
A new instance of ODPSOutput.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ ODPSOutput
Returns a new instance of ODPSOutput.
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 24 def initialize super require 'time' require_relative 'stream_client' @compressor = nil end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
40 41 42 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 40 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 279 def configure(conf) super print "configure" # You can also refer raw parameter via conf[name]. @tables = [] conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? log.warn "no table definition" else @tables << te end } if @tables.empty? raise ConfigError, "There is no <table>. <table> is required" end end |
#emit(tag, es, chain) ⇒ Object
359 360 361 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 359 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
338 339 340 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 338 def format(tag, time, record) [tag, time, record].to_json + "\n" end |
#format_tag(tag) ⇒ Object
367 368 369 370 371 372 373 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 367 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
329 330 331 332 333 334 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 329 def shutdown super @tables.reject! do |te| te.close() end end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 302 def start super config = { :aliyun_access_id => @aliyun_access_id, :aliyun_access_key => @aliyun_access_key, :project => @project, :aliyun_odps_endpoint => @aliyun_odps_endpoint, :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint, } #init Global setting if (@enable_fast_crc) OdpsDatahub::OdpsConfig::setFastCrc(true) begin OdpsDatahub::CrcCalculator::calculate(StringIO.new("")) rescue => e raise e.to_s end end #初始化各个table object @tables.each { |te| te.init(config) } log.info "the table object size is "+@tables.size.to_s end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
349 350 351 352 353 354 355 356 357 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 349 def write(chunk) #foreach tables,choose table oject ,data = chunk.read @tables.each { |table| if table.pattern.match(chunk.key) log.info "Begin to import the data and the table_match is "+chunk.key return table.import(chunk) end } end |