Module: Fluent::BigQueryOutput::LoadImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_id_format, _) ⇒ Object



484
485
486
487
488
# File 'lib/fluent/plugin/out_bigquery.rb', line 484

def _write(chunk, table_id_format, _)
  now = Time.at(Fluent::Engine.now)
  table_id = generate_table_id(table_id_format, now, nil, chunk)
  load(chunk, table_id)
end

#format(tag, time, record) ⇒ Object



465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/fluent/plugin/out_bigquery.rb', line 465

def format(tag, time, record)
  fetch_schema if @fetch_schema_table

  if @replace_record_key
    record = replace_record_key(record)
  end

  if @convert_hash_to_json
    record = convert_hash_to_json(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    buf << MultiJson.dump(row) + "\n"
  end
  buf
end

#load(chunk, table_id) ⇒ Object



490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/fluent/plugin/out_bigquery.rb', line 490

def load(chunk, table_id)
  res = nil

  create_upload_source(chunk) do |upload_source|
    res = writer.create_load_job(chunk.unique_id, @project, @dataset, table_id, upload_source, @fields)
  end
rescue Fluent::BigQuery::Error => e
  if e.retryable?
    raise e
  elsif @secondary
    flush_secondary(@secondary)
  end
end