509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
|
# File 'lib/fluent/plugin/out_bigquery.rb', line 509
def _write(chunk, table_id, template_suffix)
res = nil
job_id = nil
create_upload_source(chunk) do |upload_source|
configuration, job_id = load_configuration(table_id, template_suffix, upload_source)
res = client.insert_job(
@project,
configuration,
{
upload_source: upload_source,
content_type: "application/octet-stream",
options: {
timeout_sec: @request_timeout_sec,
open_timeout_sec: @request_open_timeout_sec,
}
}
)
end
wait_load(res.job_reference.job_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@cached_client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.insert API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
return wait_load(job_id) if job_id && e.status_code == 409 && e.message =~ /Job/
if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
raise "failed to insert into bigquery, retry" elsif @secondary
flush_secondary(@secondary)
end
end
|