Class: Fluent::Plugin::BigQueryStorageWriteInsertOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::BigQueryStorageWriteInsertOutput
- Defined in:
- lib/fluent/plugin/out_bigquery_storage_write_insert.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 43 def configure(conf) super case @auth_method when :private_key unless @email && @private_key_path raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'" end when :compute_engine # Do nothing when :json_key unless @json_key raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'" end when :application_default # Do nothing else raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}" end end |
#format(tag, time, record) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 96 def format(tag, time, record) if record.nil? log.warn("nil record detected. corrupted chunks? tag=#{tag}, time=#{time}") return end record = inject_values_to_record(tag, time, record) record.to_json + "\n" rescue log.error("format error", record: record) raise end |
#multi_workers_ready? ⇒ Boolean
122 123 124 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 122 def multi_workers_ready? true end |
#start ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 64 def start super = @proto_message_class_name if .nil? = Fluent::BigQuery::Storage::Helper.snake_to_pascal(@table) end descriptor_data = Fluent::BigQuery::Storage::Helper.get_descriptor_data(@proto_schema_rb_path) parsed = Google::Protobuf::FileDescriptorProto.decode(descriptor_data) @descriptor_proto = parsed..find { |msg| msg.name == } if @descriptor_proto.nil? raise "No descriptor proto found. class_name=#{}" end begin Google::Protobuf::DescriptorPool.generated_pool.add_serialized_file(descriptor_data) rescue Google::Protobuf::TypeError => e log.warn("unable to build file to DescriptorPool. duplicate proto file? you have to restart fluentd process to reload proto.") end @klass = Google::Protobuf::DescriptorPool.generated_pool.lookup().msgclass @writer = Fluent::BigQuery::Storage::Writer.new(@log, @auth_method, @project, @dataset, @table, @descriptor_proto, private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase, email: @email, json_key: @json_key) rescue => e raise Fluent::UnrecoverableError.new(e) end |
#write(chunk) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 109 def write(chunk) rows = chunk.open do |io| io.map do |line| val = @klass.decode_json(line, ignore_unknown_fields: @ignore_unknown_fields) @klass.encode(val) end end @writer.insert(rows) rescue Google::Protobuf::ParseError => e raise Fluent::UnrecoverableError.new(e) end |