Class: Fluent::Plugin::BigQueryBaseOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_base.rb

Overview

This class is abstract class

Direct Known Subclasses

BigQueryInsertOutput, BigQueryLoadOutput

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 79

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end
  if @schema_path
    @table_schema.load_schema(MultiJson.load(File.read(@schema_path)))
  end

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end

#fetch_schema(metadata) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 175

def fetch_schema()
  table_id = nil
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = fetch_schema_target_table()

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end

#fetch_schema_target_table(metadata) ⇒ Object



202
203
204
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 202

def fetch_schema_target_table()
  extract_placeholders(@fetch_schema_table || @tablelist[0], )
end

#format(tag, time, record) ⇒ Object

Formatter



75
76
77
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 75

config_section :format do
  config_set_default :@type, 'json'
end

#get_schema(project, dataset, metadata) ⇒ Object



206
207
208
209
210
211
212
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 206

def get_schema(project, dataset, )
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table()}"] || fetch_schema()
  else
    @table_schema
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 126

def multi_workers_ready?
  true
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


65
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 65

config_param :request_timeout_sec, :time, default: nil

#startObject



117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 117

def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
end

#time_partitioning_typeObject

Partitioning



69
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 69

config_param :time_partitioning_type, :enum, list: [:day], default: nil

#write(chunk) ⇒ Object



172
173
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 172

def write(chunk)
end

#writerObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 130

def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_field: @time_partitioning_field,
    time_partitioning_expiration: @time_partitioning_expiration,
    time_partitioning_require_partition_filter: @time_partitioning_require_partition_filter,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  })
end