Module: Fluent::BigQueryOutput::InsertImplementation
- Defined in:
- lib/fluent/plugin/out_bigquery.rb
Instance Method Summary collapse
- #_write(chunk, table_format) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #insert(table_id, rows) ⇒ Object
Instance Method Details
#_write(chunk, table_format) ⇒ Object
406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 406 def _write(chunk, table_format) rows = [] chunk.msgpack_each do |row_object| # TODO: row size limit rows << row_object.deep_symbolize_keys end rows.group_by {|row| generate_table_id(table_format, Time.at(Fluent::Engine.now), row, chunk) }.each do |table_id, group| insert(table_id, group) end end |
#format(tag, time, record) ⇒ Object
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 386 def format(tag, time, record) buf = '' if @replace_record_key record = replace_record_key(record) end if @convert_hash_to_json record = convert_hash_to_json(record) end row = @fields.format(@add_time_field.call(record, time)) unless row.empty? row = {"json" => row} row['insert_id'] = @get_insert_id.call(record) if @get_insert_id buf << row.to_msgpack end buf end |
#insert(table_id, rows) ⇒ Object
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 418 def insert(table_id, rows) client.insert_all_table_data(@project, @dataset, table_id, { rows: rows }, {}) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e # api_error? -> client cache clear @cached_client = nil = e. if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ .to_s # Table Not Found: Auto Create Table create_table(table_id) raise "table created. send rows next time." end log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: raise "failed to insert into bigquery" # TODO: error class end |