Class: Fluent::Plugin::BigQueryStorageWriteInsertOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::BigQueryStorageWriteInsertOutput
- Defined in:
- lib/fluent/plugin/out_bigquery_storage_write_insert.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 43 def configure(conf) super case @auth_method when :private_key unless @email && @private_key_path raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'" end when :compute_engine # Do nothing when :json_key unless @json_key raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'" end when :application_default # Do nothing else raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}" end end |
#format(tag, time, record) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 92 def format(tag, time, record) if record.nil? log.warn("nil record detected. corrupted chunks? tag=#{tag}, time=#{time}") return end record = inject_values_to_record(tag, time, record) record.to_json + "\n" rescue log.error("format error", record: record) raise end |
#multi_workers_ready? ⇒ Boolean
116 117 118 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 116 def multi_workers_ready? true end |
#start ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 64 def start super = if .nil? = Fluent::BigQuery::Storage::Helper.snake_to_pascal(@table) end descriptor_data = Fluent::BigQuery::Storage::Helper.get_descriptor_data(@proto_schema_rb_path) Google::Protobuf::DescriptorPool.generated_pool.add_serialized_file(descriptor_data) parsed = Google::Protobuf::FileDescriptorProto.decode(descriptor_data) @descriptor_proto = parsed..find { |msg| msg.name == } if @descriptor_proto.nil? raise "No descriptor proto found. class_name=#{message_cls_name}" end @klass = Google::Protobuf::DescriptorPool.generated_pool.lookup().msgclass @writer = Fluent::BigQuery::Storage::Writer.new(@log, @auth_method, @project, @dataset, @table, @descriptor_proto, private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase, email: @email, json_key: @json_key) rescue => e log.error("initialize error") raise Fluent::UnrecoverableError, e end |
#write(chunk) ⇒ Object
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/out_bigquery_storage_write_insert.rb', line 105 def write(chunk) rows = chunk.open do |io| io.map do |line| val = @klass.decode_json(line, ignore_unknown_fields: @ignore_unknown_fields) @klass.encode(val) end end @writer.insert(rows) end |