Module: Fluent::Plugin::BigQueryOutput::LoadImplementation

Defined in:
lib/fluent/plugin/out_bigquery.rb

Instance Method Summary collapse

Instance Method Details

#_write(chunk, table_id_format) ⇒ Object



439
440
441
442
443
444
445
446
447
# File 'lib/fluent/plugin/out_bigquery.rb', line 439

def _write(chunk, table_id_format)
  project = extract_placeholders(@project, chunk.)
  dataset = extract_placeholders(@dataset, chunk.)
  table_id = extract_placeholders(table_id_format, chunk.)

  schema = get_schema(project, dataset, chunk.)

  load(chunk, project, dataset, table_id, schema)
end

#load(chunk, project, dataset, table_id, schema) ⇒ Object



449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/fluent/plugin/out_bigquery.rb', line 449

def load(chunk, project, dataset, table_id, schema)
  res = nil

  create_upload_source(chunk) do |upload_source|
    res = writer.create_load_job(chunk.unique_id, project, dataset, table_id, upload_source, schema)
  end
rescue Fluent::BigQuery::Error => e
  raise if e.retryable?

  if @secondary
    # TODO: find better way
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      secondary: true, secondary_threshold: Float::EPSILON,
      randomize: @buffer_config.retry_randomize
    )
  else
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      randomize: @buffer_config.retry_randomize
    )
  end

  raise
end