Module: Fluent::BigQueryOutput::LoadImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_id_format) ⇒ Object



451
452
453
454
# File 'lib/fluent/plugin/out_bigquery.rb', line 451

def _write(chunk, table_id_format)
  table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now), nil, chunk)
  load(chunk, table_id)
end

#format(tag, time, record) ⇒ Object



438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/fluent/plugin/out_bigquery.rb', line 438

def format(tag, time, record)
  buf = ''

  if @replace_record_key
    record = replace_record_key(record)
  end
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    buf << MultiJson.dump(row) + "\n"
  end
  buf
end

#load(chunk, table_id) ⇒ Object



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/fluent/plugin/out_bigquery.rb', line 456

def load(chunk, table_id)
  res = nil
  create_upload_source(chunk) do |upload_source|
    res = client.insert_job(@project, {
      configuration: {
        load: {
          destination_table: {
            project_id: @project,
            dataset_id: @dataset,
            table_id: table_id,
          },
          schema: {
            fields: @fields.to_a,
          },
          write_disposition: "WRITE_APPEND",
          source_format: "NEWLINE_DELIMITED_JSON"
        }
      }
    }, {upload_source: upload_source, content_type: "application/octet-stream"})
  end
  wait_load(res, table_id)
end