Class: Fluent::BigQuery::Storage::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/bigquery/storage/writer.rb

Instance Method Summary collapse

Constructor Details

#initialize(log, auth_method, project, dataset, table, proto_descriptor, **options) ⇒ Writer

Returns a new instance of Writer.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 5

def initialize(log, auth_method, project, dataset, table, proto_descriptor, **options)
  @auth_method = auth_method
  @scope = "https://www.googleapis.com/auth/bigquery"
  @options = options
  @log = log

  @base_append_rows_request = Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new(
    write_stream: "projects/#{project}/datasets/#{dataset}/tables/#{table}/streams/_default",
    proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new(
      rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new,
      writer_schema: Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new(
        proto_descriptor: proto_descriptor
      )
    )
  )
end

Instance Method Details

#clientObject



22
23
24
25
26
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 22

def client
  @client ||= Google::Cloud::Bigquery::Storage::V1::BigQueryWrite::Client.new do |cf|
    cf.credentials = get_auth
  end
end

#insert(rows) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 28

def insert(rows)
  data = Google::Protobuf.deep_copy(@base_append_rows_request)
  data.proto_rows.rows.serialized_rows += rows
  client.append_rows([data]).each do |e|
    @log.trace(e)
  end
end