Module: Fluent::BigQueryOutput::LoadImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_id_format, _) ⇒ Object



449
450
451
452
453
# File 'lib/fluent/plugin/out_bigquery.rb', line 449

def _write(chunk, table_id_format, _)
  now = Time.at(Fluent::Engine.now)
  table_id = generate_table_id(table_id_format, now, nil, chunk)
  load(chunk, table_id)
end

#format(tag, time, record) ⇒ Object



430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/fluent/plugin/out_bigquery.rb', line 430

def format(tag, time, record)
  fetch_schema if @fetch_schema_table

  if @replace_record_key
    record = replace_record_key(record)
  end

  if @convert_hash_to_json
    record = convert_hash_to_json(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    buf << MultiJson.dump(row) + "\n"
  end
  buf
end

#load(chunk, table_id) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/fluent/plugin/out_bigquery.rb', line 455

def load(chunk, table_id)
  res = nil

  if @prevent_duplicate_load
    job_id = create_job_id(chunk, @dataset, table_id, @fields.to_a, @max_bad_records, @ignore_unknown_values)
  else
    job_id = nil
  end

  create_upload_source(chunk) do |upload_source|
    res = writer.create_load_job(@project, @dataset, table_id, upload_source, job_id, @fields, {
      ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records,
      timeout_sec: @request_timeout_sec,  open_timeout_sec: @request_open_timeout_sec, auto_create_table: @auto_create_table,
      time_partitioning_type: @time_partitioning_type, time_partitioning_expiration: @time_partitioning_expiration
    })
  end
rescue Fluent::BigQuery::Error => e
  if e.retryable?
    raise e
  elsif @secondary
    flush_secondary(@secondary)
  end
end