Class: Fluent::BigQueryOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery.rb

Overview

TODO: error classes for each api error responses class BigQueryAPIError < StandardError end

Defined Under Namespace

Modules: InsertImplementation, LoadImplementation Classes: BooleanFieldSchema, FieldSchema, FloatFieldSchema, IntegerFieldSchema, RecordSchema, StringFieldSchema, TimestampFieldSchema

Constant Summary collapse

REGEXP_MAX_NUM =

TODO: record field stream inserts doesn’t works well?

At table creation, table type json + field type record -> field type validation fails
At streaming inserts, schema cannot be specified

config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil

10

Instance Method Summary collapse

Constructor Details

#initializeBigQueryOutput

Table types developers.google.com/bigquery/docs/tables

type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.

mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_bigquery.rb', line 147

def initialize
  super
  require 'json'
  require 'google/apis/bigquery_v2'
  require 'googleauth'
  require 'active_support/json'
  require 'active_support/core_ext/hash'
  require 'active_support/core_ext/object/json'

  # MEMO: signet-0.6.1 depend on Farady.default_connection
  Faraday.default_connection.options.timeout = 60
end

Instance Method Details

#clientObject



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/fluent/plugin/out_bigquery.rb', line 260

def client
  return @cached_client if @cached_client && @cached_client_expiration > Time.now

  client = Google::Apis::BigqueryV2::BigqueryService.new

  scope = "https://www.googleapis.com/auth/bigquery"

  case @auth_method
  when 'private_key'
    require 'google/api_client/auth/key_utils'
    key = Google::APIClient::KeyUtils.load_from_pkcs12(@private_key_path, @private_key_passphrase)
    auth = Signet::OAuth2::Client.new(
            token_credential_uri: "https://accounts.google.com/o/oauth2/token",
            audience: "https://accounts.google.com/o/oauth2/token",
            scope: scope,
            issuer: @email,
            signing_key: key)

  when 'compute_engine'
    auth = Google::Auth::GCECredentials.new

  when 'json_key'
    if File.exist?(@json_key)
      auth = File.open(@json_key) do |f|
        Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: scope)
      end
    else
      key = StringIO.new(@json_key)
      auth = Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: scope)
    end

  when 'application_default'
    auth = Google::Auth.get_application_default([scope])

  else
    raise ConfigError, "Unknown auth method: #{@auth_method}"
  end

  client.authorization = auth

  @cached_client_expiration = Time.now + 1800
  @cached_client = client
end

#configure(conf) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/fluent/plugin/out_bigquery.rb', line 165

def configure(conf)
  super

  if @method == "insert"
    extend(InsertImplementation)
  elsif @method == "load"
    require 'digest/sha1'
    extend(LoadImplementation)
  else
    raise Fluend::ConfigError "'method' must be 'insert' or 'load'"
  end

  case @auth_method
  when 'private_key'
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when 'compute_engine'
    # Do nothing
  when 'json_key'
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when 'application_default'
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables.split(',') : [@table]

  @fields = RecordSchema.new('record')
  if @schema_path
    @fields.load_schema(JSON.parse(File.read(@schema_path)))
  end

  types = %w(string integer float boolean timestamp)
  types.each do |type|
    raw_fields = instance_variable_get("@field_#{type}")
    next unless raw_fields
    raw_fields.split(',').each do |field|
      @fields.register_field field.strip, type.to_sym
    end
  end

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["replace_record_key_regexp#{i}"]
    regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2)
    raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement
    raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp]
    @regexps[regexp] = replacement
  end

  @localtime = false if @localtime.nil? && @utc

  @timef = TimeFormatter.new(@time_format, @localtime)

  if @time_field
    keys = @time_field.split('.')
    last_key = keys.pop
    @add_time_field = ->(record, time) {
      keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time)
      record
    }
  else
    @add_time_field = ->(record, time) { record }
  end

  if @insert_id_field
    insert_id_keys = @insert_id_field.split('.')
    @get_insert_id = ->(record) {
      insert_id_keys.inject(record) {|h, k| h[k] }
    }
  else
    @get_insert_id = nil
  end
end

#create_table(table_id) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/fluent/plugin/out_bigquery.rb', line 317

def create_table(table_id)
  client.insert_table(@project, @dataset, {
    table_reference: {
      table_id: table_id,
    },
    schema: {
      fields: @fields.to_a,
    }
  }, {})
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  message = e.message
  if e.status_code == 409 && /Already Exists:/ =~ message
    # ignore 'Already Exists' error
    return
  end
  log.error "tables.insert API", :project_id => @project, :dataset => @dataset, :table => table_id, :code => e.status_code, :message => message
  raise "failed to create table in bigquery" # TODO: error class
end

#fetch_schemaObject



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/fluent/plugin/out_bigquery.rb', line 362

def fetch_schema
  table_id_format = @tablelist[0]
  table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now))
  res = client.get_table(@project, @dataset, table_id)

  schema = res.schema.fields.as_json
  log.debug "Load schema from BigQuery: #{@project}:#{@dataset}.#{table_id} #{schema}"
  @fields.load_schema(schema, false)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil
  message = e.message
  log.error "tables.get API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: message
  raise "failed to fetch schema from bigquery" # TODO: error class
end

#generate_table_id(table_id_format, current_time, chunk = nil) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/fluent/plugin/out_bigquery.rb', line 304

def generate_table_id(table_id_format, current_time, chunk = nil)
  table_id = current_time.strftime(table_id_format)
  if chunk
    table_id.gsub(%r(%{time_slice})) { |expr|
      chunk.key
    }
  else
    table_id.gsub(%r(%{time_slice})) { |expr|
      current_time.strftime(@time_slice_format)
    }
  end
end

#replace_record_key(record) ⇒ Object



339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/fluent/plugin/out_bigquery.rb', line 339

def replace_record_key(record)
  new_record = {}
  record.each do |key, _|
    new_key = key
    @regexps.each do |regexp, replacement|
      new_key = new_key.gsub(/#{regexp}/, replacement)
    end
    new_key = new_key.gsub(/\W/, '')
    new_record.store(new_key, record[key])
  end
  new_record
end

#startObject



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/out_bigquery.rb', line 248

def start
  super

  @cached_client = nil
  @cached_client_expiration = nil

  @tables_queue = @tablelist.dup.shuffle
  @tables_mutex = Mutex.new

  fetch_schema() if @fetch_schema
end

#write(chunk) ⇒ Object



352
353
354
355
356
357
358
359
360
# File 'lib/fluent/plugin/out_bigquery.rb', line 352

def write(chunk)
  table_id_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end
  table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now), chunk)
  _write(chunk, table_id)
end