Module: Fluent::BigQueryOutput::LoadImplementation
- Defined in:
- lib/fluent/plugin/out_bigquery.rb
Instance Method Summary collapse
- #_write(chunk, table_id_format, _) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #load(chunk, table_id) ⇒ Object
Instance Method Details
#_write(chunk, table_id_format, _) ⇒ Object
449 450 451 452 453 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 449 def _write(chunk, table_id_format, _) now = Time.at(Fluent::Engine.now) table_id = generate_table_id(table_id_format, now, nil, chunk) load(chunk, table_id) end |
#format(tag, time, record) ⇒ Object
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 430 def format(tag, time, record) fetch_schema if @fetch_schema_table if @replace_record_key record = replace_record_key(record) end if @convert_hash_to_json record = convert_hash_to_json(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? buf << MultiJson.dump(row) + "\n" end buf end |
#load(chunk, table_id) ⇒ Object
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 455 def load(chunk, table_id) res = nil if @prevent_duplicate_load job_id = create_job_id(chunk, @dataset, table_id, @fields.to_a, @max_bad_records, @ignore_unknown_values) else job_id = nil end create_upload_source(chunk) do |upload_source| res = writer.create_load_job(@project, @dataset, table_id, upload_source, job_id, @fields, { ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records, timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec, auto_create_table: @auto_create_table, time_partitioning_type: @time_partitioning_type, time_partitioning_expiration: @time_partitioning_expiration }) end rescue Fluent::BigQuery::Error => e if e.retryable? raise e elsif @secondary flush_secondary(@secondary) end end |