Class: Fluent::Plugin::BigQueryStorageWriteInsertOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_storage_write_insert.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 43

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end
end

#format(tag, time, record) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 92

def format(tag, time, record)
  if record.nil?
    log.warn("nil record detected. corrupted chunks? tag=#{tag}, time=#{time}")
    return
  end

  record = inject_values_to_record(tag, time, record)
  record.to_json + "\n"
rescue
  log.error("format error", record: record)
  raise
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 116

def multi_workers_ready?
  true
end

#startObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 64

def start
  super

  message_cls_name = @proto_message_class_name
  if message_cls_name.nil?
    message_cls_name = Fluent::BigQuery::Storage::Helper.snake_to_pascal(@table)
  end

  descriptor_data = Fluent::BigQuery::Storage::Helper.get_descriptor_data(@proto_schema_rb_path)

  Google::Protobuf::DescriptorPool.generated_pool.add_serialized_file(descriptor_data)
  parsed = Google::Protobuf::FileDescriptorProto.decode(descriptor_data)
  @descriptor_proto = parsed.message_type.find { |msg| msg.name == message_cls_name }
  if @descriptor_proto.nil?
    raise "No descriptor proto found. class_name=#{message_cls_name}"
  end
  @klass = Google::Protobuf::DescriptorPool.generated_pool.lookup(message_cls_name).msgclass

  @writer = Fluent::BigQuery::Storage::Writer.new(@log, @auth_method, @project, @dataset, @table, @descriptor_proto,
                                                  private_key_path: @private_key_path,
                                                  private_key_passphrase: @private_key_passphrase,
                                                  email: @email,
                                                  json_key: @json_key)
rescue => e
  log.error("initialize error")
  raise Fluent::UnrecoverableError, e
end

#write(chunk) ⇒ Object



105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 105

def write(chunk)
  rows = chunk.open do |io|
    io.map do |line|
      val = @klass.decode_json(line, ignore_unknown_fields: @ignore_unknown_fields)
      @klass.encode(val)
    end
  end

  @writer.insert(rows)
end