Class: Fluent::Plugin::BigQueryInsertOutput

Inherits:
BigQueryBaseOutput show all
Defined in:
lib/fluent/plugin/out_bigquery_insert.rb

Instance Method Summary collapse

Methods inherited from BigQueryBaseOutput

#clustering_fields, #fetch_schema, #fetch_schema_target_table, #get_schema, #multi_workers_ready?, #request_timeout_sec, #start, #time_partitioning_type, #writer

Instance Method Details

#bufferObject

Buffer



33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 33

config_section :buffer do
  config_set_default :@type, "memory"
  config_set_default :flush_mode, :interval
  config_set_default :flush_interval, 1
  config_set_default :flush_thread_interval, 0.05
  config_set_default :flush_thread_burst_interval, 0.05
  config_set_default :chunk_limit_size, 1 * 1024 ** 2 # 1MB
  config_set_default :total_limit_size, 1 * 1024 ** 3 # 1GB
  config_set_default :chunk_limit_records, 500
end

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 44

def configure(conf)
  super

  if @insert_id_field
    if @insert_id_field !~ /^\$[\[\.]/ && @insert_id_field =~ /\./
      warn "[BREAKING CHANGE] insert_id_field format is changed. Use fluentd record_accessor helper. (https://docs.fluentd.org/v1.0/articles/api-plugin-helper-record_accessor)"
    end
    @get_insert_id = record_accessor_create(@insert_id_field)
  end

  formatter_config = conf.elements("format")[0]
  if formatter_config && formatter_config['@type'] != "json"
    raise ConfigError, "`bigquery_insert` supports only json formatter."
  end
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', type: 'json', conf: formatter_config)

  placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}/template_suffix=#{@template_suffix}"
  placeholder_validate!(:bigquery_insert, placeholder_params)
end

#format(tag, time, record) ⇒ Object

for Fluent::Plugin::Output#implement? method



65
66
67
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 65

def format(tag, time, record)
  super
end

#insert(project, dataset, table_id, rows, schema, template_suffix) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 98

def insert(project, dataset, table_id, rows, schema, template_suffix)
  writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix)
rescue Fluent::BigQuery::Error => e
  raise if e.retryable?

  if @secondary
    # TODO: find better way
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      secondary: true, secondary_threshold: Float::EPSILON,
      randomize: @buffer_config.retry_randomize
    )
  else
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      randomize: @buffer_config.retry_randomize
    )
  end

  raise
end

#write(chunk) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_bigquery_insert.rb', line 69

def write(chunk)
  table_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end

  now = Time.now.utc.strftime("%Y-%m-%d %H:%M:%S.%6N") if @add_insert_timestamp

  rows = chunk.open do |io|
    io.map do |line|
      record = MultiJson.load(line)
      record[@add_insert_timestamp] = now if @add_insert_timestamp
      row = {"json" => record}
      row["insert_id"] = @get_insert_id.call(record) if @get_insert_id
      Fluent::BigQuery::Helper.deep_symbolize_keys(row)
    end
  end

   = chunk.
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = extract_placeholders(table_format, )
  template_suffix = @template_suffix ? extract_placeholders(@template_suffix, ) : nil
  schema = get_schema(project, dataset, )

  insert(project, dataset, table_id, rows, schema, template_suffix)
end