Class: Fluent::ODPSOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ODPSOutput
- Defined in:
- lib/fluent/plugin/out_aliyun_odps.rb
Defined Under Namespace
Classes: TableElement
Constant Summary collapse
- @@txt =
nil
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #format_tag(tag) ⇒ Object
-
#initialize ⇒ ODPSOutput
constructor
A new instance of ODPSOutput.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ ODPSOutput
Returns a new instance of ODPSOutput.
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 24 def initialize super require 'time' require_relative 'stream_client' @compressor = nil end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
39 40 41 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 39 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 270 def configure(conf) super print "configure" # You can also refer raw parameter via conf[name]. @tables = [] conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? log.warn "no table definition" else @tables << te end } if @tables.empty? raise ConfigError, "There is no <table>. <table> is required" end end |
#emit(tag, es, chain) ⇒ Object
341 342 343 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 341 def emit(tag, es, chain) super(tag, es, chain, format_tag(tag)) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
320 321 322 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 320 def format(tag, time, record) [tag, time, record].to_json + "\n" end |
#format_tag(tag) ⇒ Object
349 350 351 352 353 354 355 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 349 def format_tag(tag) if @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
311 312 313 314 315 316 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 311 def shutdown super @tables.reject! do |te| te.close() end end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 293 def start super config = { :aliyun_access_id => @aliyun_access_id, :aliyun_access_key => @aliyun_access_key, :project => @project, :aliyun_odps_endpoint => @aliyun_odps_endpoint, :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint, } #初始化各个table object @tables.each { |te| te.init(config) } log.info "the table object size is "+@tables.size.to_s end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
331 332 333 334 335 336 337 338 339 |
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 331 def write(chunk) #foreach tables,choose table oject ,data = chunk.read @tables.each { |table| if table.pattern.match(chunk.key) log.info "Begin to import the data and the table_match is "+chunk.key return table.import(chunk) end } end |