Class: Fluent::Plugin::BigQueryBaseOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_base.rb

Overview

This class is abstract class

Direct Known Subclasses

BigQueryInsertOutput, BigQueryLoadOutput

Instance Method Summary collapse

Instance Method Details

#clustering_fieldsObject

Clustering



77
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 77

config_param :clustering_fields, :array, default: nil

#configure(conf) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 84

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end

#fetch_schema(metadata) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 182

def fetch_schema()
  table_id = nil
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = fetch_schema_target_table()

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end

#fetch_schema_target_table(metadata) ⇒ Object



209
210
211
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 209

def fetch_schema_target_table()
  extract_placeholders(@fetch_schema_table || @tablelist[0], )
end

#format(tag, time, record) ⇒ Object

Formatter



80
81
82
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 80

config_section :format do
  config_set_default :@type, 'json'
end

#get_schema(project, dataset, metadata) ⇒ Object



228
229
230
231
232
233
234
235
236
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 228

def get_schema(project, dataset, )
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table()}"] || fetch_schema()
  elsif @schema_path
    @read_schemas[read_schema_target_path()] || read_schema()
  else
    @table_schema
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 129

def multi_workers_ready?
  true
end

#read_schema(metadata) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 213

def read_schema()
  schema_path = read_schema_target_path()

  unless @read_schemas[schema_path]
    table_schema = Fluent::BigQuery::RecordSchema.new("record")
    table_schema.load_schema(MultiJson.load(File.read(schema_path)))
    @read_schemas[schema_path] = table_schema
  end
  @read_schemas[schema_path]
end

#read_schema_target_path(metadata) ⇒ Object



224
225
226
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 224

def read_schema_target_path()
  extract_placeholders(@schema_path, )
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


68
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 68

config_param :request_timeout_sec, :time, default: nil

#startObject



119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 119

def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
  @read_schemas = {}
end

#time_partitioning_typeObject

Partitioning



72
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 72

config_param :time_partitioning_type, :enum, list: [:day], default: nil

#write(chunk) ⇒ Object



179
180
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 179

def write(chunk)
end

#writerObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 133

def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    location: @location,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_field: @time_partitioning_field,
    time_partitioning_expiration: @time_partitioning_expiration,
    require_partition_filter: @require_partition_filter,
    clustering_fields: @clustering_fields,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  })
end