Class: Fluent::BigQueryOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery.rb

Defined Under Namespace

Modules: InsertImplementation, LoadImplementation

Constant Summary collapse

REGEXP_MAX_NUM =

TODO: record field stream inserts doesn’t works well?

At table creation, table type json + field type record -> field type validation fails
At streaming inserts, schema cannot be specified

config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil

10

Instance Method Summary collapse

Constructor Details

#initializeBigQueryOutput

Table types developers.google.com/bigquery/docs/tables

type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.

mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.



173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_bigquery.rb', line 173

def initialize
  super
  require 'multi_json'
  require 'google/apis/bigquery_v2'
  require 'googleauth'
  require 'active_support/json'
  require 'active_support/core_ext/hash'
  require 'active_support/core_ext/object/json'

  # MEMO: signet-0.6.1 depend on Farady.default_connection
  Faraday.default_connection.options.timeout = 60
end

Instance Method Details

#configure(conf) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/fluent/plugin/out_bigquery.rb', line 186

def configure(conf)
  if conf["method"] == "load"
    configure_for_load(conf)
  else
    configure_for_insert(conf)
  end
  super

  case @method
  when :insert
    extend(InsertImplementation)
  when :load
    raise Fluent::ConfigError, "'template_suffix' is for only `insert` mode, instead use 'fetch_schema_table' and formatted table name" if @template_suffix
    extend(LoadImplementation)
  else
    raise Fluent::ConfigError "'method' must be 'insert' or 'load'"
  end

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables.split(',') : [@table]

  legacy_schema_config_deprecation
  @fields = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @fields.load_schema(@schema)
  end
  if @schema_path
    @fields.load_schema(MultiJson.load(File.read(@schema_path)))
  end

  types = %w(string integer float boolean timestamp)
  types.each do |type|
    raw_fields = instance_variable_get("@field_#{type}")
    next unless raw_fields
    raw_fields.split(',').each do |field|
      @fields.register_field field.strip, type.to_sym
    end
  end

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["replace_record_key_regexp#{i}"]
    regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2)
    raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement
    raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp]
    @regexps[regexp] = replacement
  end

  @localtime = false if @localtime.nil? && @utc

  @timef = TimeFormatter.new(@time_format, @localtime)

  if @time_field
    keys = @time_field.split('.')
    last_key = keys.pop
    @add_time_field = ->(record, time) {
      keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time)
      record
    }
  else
    @add_time_field = ->(record, time) { record }
  end

  if @insert_id_field
    insert_id_keys = @insert_id_field.split('.')
    @get_insert_id = ->(record) {
      insert_id_keys.inject(record) {|h, k| h[k] }
    }
  else
    @get_insert_id = nil
  end

  warn "[DEPRECATION] `convert_hash_to_json` param is deprecated. If Hash value is inserted string field, plugin convert it to json automatically." if @convert_hash_to_json
end

#configure_for_insert(conf) ⇒ Object

default for insert

Raises:

  • (ConfigError)


23
24
25
26
27
28
29
30
31
32
# File 'lib/fluent/plugin/out_bigquery.rb', line 23

def configure_for_insert(conf)
  raise ConfigError unless conf["method"] != "load"

  conf["buffer_type"]                = "lightening"  unless conf["buffer_type"]
  conf["flush_interval"]             = 0.25          unless conf["flush_interval"]
  conf["try_flush_interval"]         = 0.05          unless conf["try_flush_interval"]
  conf["buffer_chunk_limit"]         = 1 * 1024 ** 2 unless conf["buffer_chunk_limit"] # 1MB
  conf["buffer_queue_limit"]         = 1024          unless conf["buffer_queue_limit"]
  conf["buffer_chunk_records_limit"] = 500           unless conf["buffer_chunk_records_limit"]
end

#configure_for_load(conf) ⇒ Object

default for loads

Raises:

  • (ConfigError)


35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_bigquery.rb', line 35

def configure_for_load(conf)
  raise ConfigError unless conf["method"] == "load"

  # buffer_type, flush_interval, try_flush_interval is TimeSlicedOutput default
  conf["buffer_chunk_limit"] = 1 * 1024 ** 3 unless conf["buffer_chunk_limit"] # 1GB
  conf["buffer_queue_limit"] = 32            unless conf["buffer_queue_limit"]
end

#convert_hash_to_json(record) ⇒ Object



350
351
352
353
354
355
356
357
# File 'lib/fluent/plugin/out_bigquery.rb', line 350

def convert_hash_to_json(record)
  record.each do |key, value|
    if value.class == Hash
      record[key] = MultiJson.dump(value)
    end
  end
  record
end

#fetch_schema(allow_overwrite = true) ⇒ Object



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/fluent/plugin/out_bigquery.rb', line 375

def fetch_schema(allow_overwrite = true)
  table_id = nil
  @fetch_schema_mutex.synchronize do
    if Fluent::Engine.now - @last_fetch_schema_time > @schema_cache_expire
      table_id_format = @fetch_schema_table || @tablelist[0]
      table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now))
      schema = writer.fetch_schema(@project, @dataset, table_id)

      if schema
        if allow_overwrite
          fields = Fluent::BigQuery::RecordSchema.new("record")
          fields.load_schema(schema, allow_overwrite)
          @fields = fields
        else
          @fields.load_schema(schema, allow_overwrite)
        end
      else
        if @fields.empty?
          raise "failed to fetch schema from bigquery"
        else
          log.warn "#{table_id} uses previous schema"
        end
      end

      @last_fetch_schema_time = Fluent::Engine.now
    end
  end
end

#generate_table_id(table_id_format, current_time, row = nil, chunk = nil) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/fluent/plugin/out_bigquery.rb', line 310

def generate_table_id(table_id_format, current_time, row = nil, chunk = nil)
  format, col = table_id_format.split(/@/)
  time = if col && row
           keys = col.split('.')
           t = keys.inject(row[:json]) {|obj, attr| obj[attr.to_sym] }
           Time.at(t)
         else
           current_time
         end
  if row && format =~ /\$\{/
    format.gsub!(/\$\{\s*(\w+)\s*\}/) do |m|
      row[:json][$1.to_sym].to_s.gsub(/[^\w]/, '')
    end
  end
  table_id = time.strftime(format)

  if chunk
    table_id.gsub(%r(%{time_slice})) { |expr|
      chunk.key
    }
  else
    table_id.gsub(%r(%{time_slice})) { |expr|
      current_time.strftime(@time_slice_format)
    }
  end
end

#legacy_schema_config_deprecationObject



359
360
361
362
363
# File 'lib/fluent/plugin/out_bigquery.rb', line 359

def legacy_schema_config_deprecation
  if [@field_string, @field_integer, @field_float, @field_boolean, @field_timestamp].any?
    warn "[DEPRECATION] `field_*` style schema config is deprecated. Instead of it, use `schema` config params that is array of json style."
  end
end

#replace_record_key(record) ⇒ Object



337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/fluent/plugin/out_bigquery.rb', line 337

def replace_record_key(record)
  new_record = {}
  record.each do |key, _|
    new_key = key
    @regexps.each do |regexp, replacement|
      new_key = new_key.gsub(/#{regexp}/, replacement)
    end
    new_key = new_key.gsub(/\W/, '')
    new_record.store(new_key, record[key])
  end
  new_record
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


151
# File 'lib/fluent/plugin/out_bigquery.rb', line 151

config_param :request_timeout_sec, :time, default: nil

#startObject



281
282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/out_bigquery.rb', line 281

def start
  super

  @tables_queue = @tablelist.dup.shuffle
  @tables_mutex = Mutex.new
  @fetch_schema_mutex = Mutex.new

  @last_fetch_schema_time = 0
  fetch_schema(false) if @fetch_schema
end

#time_partitioning_typeObject

Partitioning



155
# File 'lib/fluent/plugin/out_bigquery.rb', line 155

config_param :time_partitioning_type, :enum, list: [:day], default: nil

#write(chunk) ⇒ Object



365
366
367
368
369
370
371
372
373
# File 'lib/fluent/plugin/out_bigquery.rb', line 365

def write(chunk)
  table_id_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end
  template_suffix_format = @template_suffix
  _write(chunk, table_id_format, template_suffix_format)
end

#writerObject



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/fluent/plugin/out_bigquery.rb', line 292

def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_expiration: @time_partitioning_expiration,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  })
end