Module: Fluent::BigQueryOutput::LoadImplementation
- Defined in:
- lib/fluent/plugin/out_bigquery.rb
Instance Method Summary collapse
- #_write(chunk, table_id_format) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #load(chunk, table_id) ⇒ Object
Instance Method Details
#_write(chunk, table_id_format) ⇒ Object
451 452 453 454 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 451 def _write(chunk, table_id_format) table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now), nil, chunk) load(chunk, table_id) end |
#format(tag, time, record) ⇒ Object
438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 438 def format(tag, time, record) buf = '' if @replace_record_key record = replace_record_key(record) end row = @fields.format(@add_time_field.call(record, time)) unless row.empty? buf << MultiJson.dump(row) + "\n" end buf end |
#load(chunk, table_id) ⇒ Object
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 456 def load(chunk, table_id) res = nil create_upload_source(chunk) do |upload_source| res = client.insert_job(@project, { configuration: { load: { destination_table: { project_id: @project, dataset_id: @dataset, table_id: table_id, }, schema: { fields: @fields.to_a, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON" } } }, {upload_source: upload_source, content_type: "application/octet-stream"}) end wait_load(res, table_id) end |