Module: Fluent::BigQueryOutput::InsertImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_format) ⇒ Object



406
407
408
409
410
411
412
413
414
415
416
# File 'lib/fluent/plugin/out_bigquery.rb', line 406

def _write(chunk, table_format)
  rows = []
  chunk.msgpack_each do |row_object|
    # TODO: row size limit
    rows << row_object.deep_symbolize_keys
  end

  rows.group_by {|row| generate_table_id(table_format, Time.at(Fluent::Engine.now), row, chunk) }.each do |table_id, group|
    insert(table_id, group)
  end
end

#format(tag, time, record) ⇒ Object



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/fluent/plugin/out_bigquery.rb', line 386

def format(tag, time, record)
  buf = ''

  if @replace_record_key
    record = replace_record_key(record)
  end

  if @convert_hash_to_json
    record = convert_hash_to_json(record)
  end

  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    row = {"json" => row}
    row['insert_id'] = @get_insert_id.call(record) if @get_insert_id
    buf << row.to_msgpack
  end
  buf
end

#insert(table_id, rows) ⇒ Object



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/fluent/plugin/out_bigquery.rb', line 418

def insert(table_id, rows)
  client.insert_all_table_data(@project, @dataset, table_id, {
    rows: rows
  }, {})
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  message = e.message
  if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ message.to_s
    # Table Not Found: Auto Create Table
    create_table(table_id)
    raise "table created. send rows next time."
  end
  log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: message
  raise "failed to insert into bigquery" # TODO: error class
end