Module: Fluent::BigQueryOutput::InsertImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_format, template_suffix_format) ⇒ Object



426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/fluent/plugin/out_bigquery.rb', line 426

def _write(chunk, table_format, template_suffix_format)
  now = Time.now.utc.strftime("%Y-%m-%d %H:%M:%S.%6N") if @add_insert_timestamp
  rows = []
  chunk.msgpack_each do |row_object|
    # TODO: row size limit
    row_object["json"][@add_insert_timestamp] = now if @add_insert_timestamp
    rows << row_object.deep_symbolize_keys
  end

  now = Time.at(Fluent::Engine.now)
  group = rows.group_by do |row|
    [
      generate_table_id(table_format, now, row, chunk),
      template_suffix_format ? generate_table_id(template_suffix_format, now, row, chunk) : nil,
    ]
  end
  group.each do |(table_id, template_suffix), group_rows|
    insert(table_id, group_rows, template_suffix)
  end
end

#format(tag, time, record) ⇒ Object



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/fluent/plugin/out_bigquery.rb', line 405

def format(tag, time, record)
  fetch_schema if @template_suffix

  if @replace_record_key
    record = replace_record_key(record)
  end

  if @convert_hash_to_json
    record = convert_hash_to_json(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    row = {"json" => row}
    row['insert_id'] = @get_insert_id.call(record) if @get_insert_id
    buf << row.to_msgpack
  end
  buf
end

#insert(table_id, rows, template_suffix) ⇒ Object



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/fluent/plugin/out_bigquery.rb', line 447

def insert(table_id, rows, template_suffix)
  writer.insert_rows(@project, @dataset, table_id, rows, template_suffix: template_suffix)
rescue Fluent::BigQuery::Error => e
  if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
    # Table Not Found: Auto Create Table
    writer.create_table(@project, @dataset, table_id, @fields)
    raise "table created. send rows next time."
  end

  if e.retryable?
    raise e # TODO: error class
  elsif @secondary
    flush_secondary(@secondary)
  end
end