440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
|
# File 'lib/fluent/plugin/out_bigquery.rb', line 440
def _write(chunk, table_id, template_suffix)
rows = []
chunk.msgpack_each do |row_object|
rows << row_object.deep_symbolize_keys
end
body = {
rows: rows,
skip_invalid_rows: @skip_invalid_rows,
ignore_unknown_values: @ignore_unknown_values,
}
body.merge!(template_suffix: template_suffix) if template_suffix
res = client.insert_all_table_data(
@project, @dataset, table_id, body, {
options: {timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec}
}
)
if res.insert_errors
reasons = []
res.insert_errors.each do |i|
i.errors.each do |e|
reasons << e.reason
log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason
end
end
raise "failed to insert into bigquery, retry" if reasons.find { |r| RETRYABLE_ERROR_REASON.include?(r) }
return if reasons.all? { |r| r == "invalid" } && @skip_invalid_rows
flush_secondary(@secondary) if @secondary
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@cached_client = nil
if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
create_table(table_id)
raise "table created. send rows next time."
end
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
raise "failed to insert into bigquery, retry"
elsif @secondary
flush_secondary(@secondary)
end
end
|