Class: Fluent::BigQuery::Storage::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/bigquery/storage/writer.rb

Instance Method Summary collapse

Constructor Details

#initialize(log, auth_method, project, dataset, table, proto_descriptor, **options) ⇒ Writer

Returns a new instance of Writer.



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 5

def initialize(log, auth_method, project, dataset, table, proto_descriptor, **options)
  @auth_method = auth_method
  @scope = "https://www.googleapis.com/auth/bigquery"
  @options = options
  @log = log

  @write_stream = "projects/#{project}/datasets/#{dataset}/tables/#{table}/streams/_default"
  @write_schema = Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new(
    proto_descriptor: proto_descriptor
  )
end

Instance Method Details

#clientObject



17
18
19
20
21
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 17

def client
  @client ||= Google::Cloud::Bigquery::Storage::V1::BigQueryWrite::Client.new do |cf|
    cf.credentials = get_auth
  end
end

#insert(rows) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/bigquery/storage/writer.rb', line 23

def insert(rows)
  data = [
    Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new(
      write_stream: @write_stream,
      proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new(
        rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new(
          serialized_rows: rows
        ),
        writer_schema: @write_schema
      )
    )
  ]
  client.append_rows(data).map do |e|
    @log.trace(e)
  end
end