Module: Fluent::BigQueryOutput::InsertImplementation
- Defined in:
- lib/fluent/plugin/out_bigquery.rb
Instance Method Summary collapse
- #_write(chunk, table_format, template_suffix_format) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #insert(table_id, rows, template_suffix) ⇒ Object
Instance Method Details
#_write(chunk, table_format, template_suffix_format) ⇒ Object
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 393 def _write(chunk, table_format, template_suffix_format) rows = [] chunk.msgpack_each do |row_object| # TODO: row size limit rows << row_object.deep_symbolize_keys end now = Time.at(Fluent::Engine.now) group = rows.group_by do |row| [ generate_table_id(table_format, now, row, chunk), template_suffix_format ? generate_table_id(template_suffix_format, now, row, chunk) : nil, ] end group.each do |(table_id, template_suffix), group_rows| insert(table_id, group_rows, template_suffix) end end |
#format(tag, time, record) ⇒ Object
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 372 def format(tag, time, record) fetch_schema if @template_suffix if @replace_record_key record = replace_record_key(record) end if @convert_hash_to_json record = convert_hash_to_json(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? row = {"json" => row} row['insert_id'] = @get_insert_id.call(record) if @get_insert_id buf << row.to_msgpack end buf end |
#insert(table_id, rows, template_suffix) ⇒ Object
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/fluent/plugin/out_bigquery.rb', line 412 def insert(table_id, rows, template_suffix) writer.insert_rows(@project, @dataset, table_id, rows, skip_invalid_rows: @skip_invalid_rows, ignore_unknown_values: @ignore_unknown_values, template_suffix: template_suffix) rescue Fluent::BigQuery::Error => e if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e. # Table Not Found: Auto Create Table writer.create_table(@project, @dataset, table_id, @fields, time_partitioning_type: @time_partitioning_type, time_partitioning_expiration: @time_partitioning_expiration) raise "table created. send rows next time." end if e.retryable? raise e # TODO: error class elsif @secondary flush_secondary(@secondary) end end |