Module: Fluent::BigQueryOutput::LoadImplementation
- Defined in:
- lib/fluent/plugin/out_bigquery.rb
Instance Method Summary collapse
- #_write(chunk, table_id_format, _) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #load(chunk, table_id) ⇒ Object
Instance Method Details
#_write(chunk, table_id_format, _) ⇒ Object
484 485 486 487 488 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 484 def _write(chunk, table_id_format, _) now = Time.at(Fluent::Engine.now) table_id = generate_table_id(table_id_format, now, nil, chunk) load(chunk, table_id) end |
#format(tag, time, record) ⇒ Object
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 465 def format(tag, time, record) fetch_schema if @fetch_schema_table if @replace_record_key record = replace_record_key(record) end if @convert_hash_to_json record = convert_hash_to_json(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? buf << MultiJson.dump(row) + "\n" end buf end |
#load(chunk, table_id) ⇒ Object
490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 490 def load(chunk, table_id) res = nil create_upload_source(chunk) do |upload_source| res = writer.create_load_job(chunk.unique_id, @project, @dataset, table_id, upload_source, @fields) end rescue Fluent::BigQuery::Error => e if e.retryable? raise e elsif @secondary flush_secondary(@secondary) end end |