Module: Fluent::BigQueryOutput::InsertImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_id, template_suffix) ⇒ Object



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# File 'lib/fluent/plugin/out_bigquery.rb', line 440

def _write(chunk, table_id, template_suffix)
  rows = []
  chunk.msgpack_each do |row_object|
    # TODO: row size limit
    rows << row_object.deep_symbolize_keys
  end

  body = {
    rows: rows,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
  }
  body.merge!(template_suffix: template_suffix) if template_suffix
  res = client.insert_all_table_data(
    @project, @dataset, table_id, body, {
      options: {timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec}
    }
  )

  if res.insert_errors
    reasons = []
    res.insert_errors.each do |i|
      i.errors.each do |e|
        reasons << e.reason
        log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason
      end
    end

    raise "failed to insert into bigquery, retry" if reasons.find { |r| RETRYABLE_ERROR_REASON.include?(r) }
    return if reasons.all? { |r| r == "invalid" } && @skip_invalid_rows
    flush_secondary(@secondary) if @secondary
  end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
    # Table Not Found: Auto Create Table
    create_table(table_id)
    raise "table created. send rows next time."
  end

  reason = e.respond_to?(:reason) ? e.reason : nil
  log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

  if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
    raise "failed to insert into bigquery, retry" # TODO: error class
  elsif @secondary
    flush_secondary(@secondary)
  end
end

#format(tag, time, record) ⇒ Object



423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/fluent/plugin/out_bigquery.rb', line 423

def format(tag, time, record)
  fetch_schema if @template_suffix

  if @replace_record_key
    record = replace_record_key(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    row = {"json" => row}
    row["insert_id"] = @get_insert_id.call(record) if @get_insert_id
    buf << row.to_msgpack
  end
  buf
end