Class: Fluent::Plugin::BigQueryStorageWriteInsertOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_storage_write_insert.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 43

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end
end

#format(tag, time, record) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 96

def format(tag, time, record)
  if record.nil?
    log.warn("nil record detected. corrupted chunks? tag=#{tag}, time=#{time}")
    return
  end

  record = inject_values_to_record(tag, time, record)
  record.to_json + "\n"
rescue
  log.error("format error", record: record)
  raise
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 122

def multi_workers_ready?
  true
end

#startObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 64

def start
  super

  message_cls_name = @proto_message_class_name
  if message_cls_name.nil?
    message_cls_name = Fluent::BigQuery::Storage::Helper.snake_to_pascal(@table)
  end

  descriptor_data = Fluent::BigQuery::Storage::Helper.get_descriptor_data(@proto_schema_rb_path)

  parsed = Google::Protobuf::FileDescriptorProto.decode(descriptor_data)
  @descriptor_proto = parsed.message_type.find { |msg| msg.name == message_cls_name }
  if @descriptor_proto.nil?
    raise "No descriptor proto found. class_name=#{message_cls_name}"
  end

  begin
    Google::Protobuf::DescriptorPool.generated_pool.add_serialized_file(descriptor_data)
  rescue Google::Protobuf::TypeError => e
    log.warn("unable to build file to DescriptorPool. duplicate proto file? you have to restart fluentd process to reload proto.")
  end
  @klass = Google::Protobuf::DescriptorPool.generated_pool.lookup(message_cls_name).msgclass

  @writer = Fluent::BigQuery::Storage::Writer.new(@log, @auth_method, @project, @dataset, @table, @descriptor_proto,
                                                  private_key_path: @private_key_path,
                                                  private_key_passphrase: @private_key_passphrase,
                                                  email: @email,
                                                  json_key: @json_key)
rescue => e
  raise Fluent::UnrecoverableError.new(e)
end

#write(chunk) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 109

def write(chunk)
  rows = chunk.open do |io|
    io.map do |line|
      val = @klass.decode_json(line, ignore_unknown_fields: @ignore_unknown_fields)
      @klass.encode(val)
    end
  end

  @writer.insert(rows)
rescue Google::Protobuf::ParseError => e
  raise Fluent::UnrecoverableError.new(e)
end