Class: Fluent::Plugin::BigQueryInsertOutput
- Inherits:
-
BigQueryBaseOutput
- Object
- Output
- BigQueryBaseOutput
- Fluent::Plugin::BigQueryInsertOutput
- Defined in:
- lib/fluent/plugin/out_bigquery_insert.rb
Instance Method Summary collapse
-
#buffer ⇒ Object
Buffer.
- #configure(conf) ⇒ Object
-
#format(tag, time, record) ⇒ Object
for Fluent::Plugin::Output#implement? method.
- #insert(project, dataset, table_id, rows, schema, template_suffix) ⇒ Object
- #write(chunk) ⇒ Object
Methods inherited from BigQueryBaseOutput
#clustering_fields, #fetch_schema, #fetch_schema_target_table, #get_schema, #multi_workers_ready?, #request_timeout_sec, #start, #time_partitioning_type, #writer
Instance Method Details
#buffer ⇒ Object
Buffer
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 33 config_section :buffer do config_set_default :@type, "memory" config_set_default :flush_mode, :interval config_set_default :flush_interval, 1 config_set_default :flush_thread_interval, 0.05 config_set_default :flush_thread_burst_interval, 0.05 config_set_default :chunk_limit_size, 1 * 1024 ** 2 # 1MB config_set_default :total_limit_size, 1 * 1024 ** 3 # 1GB config_set_default :chunk_limit_records, 500 end |
#configure(conf) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 44 def configure(conf) super if @insert_id_field if @insert_id_field !~ /^\$[\[\.]/ && @insert_id_field =~ /\./ warn "[BREAKING CHANGE] insert_id_field format is changed. Use fluentd record_accessor helper. (https://docs.fluentd.org/v1.0/articles/api-plugin-helper-record_accessor)" end @get_insert_id = record_accessor_create(@insert_id_field) end formatter_config = conf.elements("format")[0] if formatter_config && formatter_config['@type'] != "json" raise ConfigError, "`bigquery_insert` supports only json formatter." end @formatter = formatter_create(usage: 'out_bigquery_for_insert', type: 'json', conf: formatter_config) placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}/template_suffix=#{@template_suffix}" placeholder_validate!(:bigquery_insert, placeholder_params) end |
#format(tag, time, record) ⇒ Object
for Fluent::Plugin::Output#implement? method
65 66 67 |
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 65 def format(tag, time, record) super end |
#insert(project, dataset, table_id, rows, schema, template_suffix) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 98 def insert(project, dataset, table_id, rows, schema, template_suffix) writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix) rescue Fluent::BigQuery::Error => e raise if e.retryable? if @secondary # TODO: find better way @retry = retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: Float::EPSILON, randomize: @buffer_config.retry_randomize ) else @retry = retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: @buffer_config.retry_randomize ) end raise end |
#write(chunk) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 69 def write(chunk) table_format = @tables_mutex.synchronize do t = @tables_queue.shift @tables_queue.push t t end now = Time.now.utc.strftime("%Y-%m-%d %H:%M:%S.%6N") if @add_insert_timestamp rows = chunk.open do |io| io.map do |line| record = MultiJson.load(line) record[@add_insert_timestamp] = now if @add_insert_timestamp row = {"json" => record} row["insert_id"] = @get_insert_id.call(record) if @get_insert_id Fluent::BigQuery::Helper.deep_symbolize_keys(row) end end = chunk. project = extract_placeholders(@project, ) dataset = extract_placeholders(@dataset, ) table_id = extract_placeholders(table_format, ) template_suffix = @template_suffix ? extract_placeholders(@template_suffix, ) : nil schema = get_schema(project, dataset, ) insert(project, dataset, table_id, rows, schema, template_suffix) end |