Class: Fluent::Plugin::BigQueryBaseOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_base.rb

Overview

This class is abstract class

Direct Known Subclasses

BigQueryInsertOutput, BigQueryLoadOutput

Instance Method Summary collapse

Instance Method Details

#clustering_fieldsObject

Clustering



74
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 74

config_param :clustering_fields, :array, default: nil

#configure(conf) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 81

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end

#fetch_schema(metadata) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 183

def fetch_schema()
  table_id = nil
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = fetch_schema_target_table()

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end

#fetch_schema_target_table(metadata) ⇒ Object



210
211
212
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 210

def fetch_schema_target_table()
  extract_placeholders(@fetch_schema_table || @tablelist[0], )
end

#format(tag, time, record) ⇒ Object

Formatter



77
78
79
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 77

config_section :format do
  config_set_default :@type, 'json'
end

#get_schema(project, dataset, metadata) ⇒ Object



229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 229

def get_schema(project, dataset, )
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema()
  elsif @schema_path
    @read_schemas[read_schema_target_path()] || read_schema()
  else
    @table_schema
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 126

def multi_workers_ready?
  true
end

#read_schema(metadata) ⇒ Object



214
215
216
217
218
219
220
221
222
223
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 214

def read_schema()
  schema_path = read_schema_target_path()

  unless @read_schemas[schema_path]
    table_schema = Fluent::BigQuery::RecordSchema.new("record")
    table_schema.load_schema(MultiJson.load(File.read(schema_path)))
    @read_schemas[schema_path] = table_schema
  end
  @read_schemas[schema_path]
end

#read_schema_target_path(metadata) ⇒ Object



225
226
227
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 225

def read_schema_target_path()
  extract_placeholders(@schema_path, )
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


65
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 65

config_param :request_timeout_sec, :time, default: nil

#startObject



116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 116

def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
  @read_schemas = {}
end

#time_partitioning_typeObject

Partitioning



69
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 69

config_param :time_partitioning_type, :enum, list: [:day, :hour], default: nil

#write(chunk) ⇒ Object



180
181
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 180

def write(chunk)
end

#writerObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 130

def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method,
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_field: @time_partitioning_field,
    time_partitioning_expiration: @time_partitioning_expiration,
    require_partition_filter: @require_partition_filter,
    clustering_fields: @clustering_fields,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  )
end