Class: Fluent::Plugin::GCSOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::GCSOutput
- Defined in:
- lib/fluent/plugin/out_gcs.rb
Constant Summary collapse
- DEFAULT_FORMAT_TYPE =
"out_file"- MAX_HEX_RANDOM_LENGTH =
32
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GCSOutput
constructor
A new instance of GCSOutput.
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GCSOutput
Returns a new instance of GCSOutput.
15 16 17 18 19 |
# File 'lib/fluent/plugin/out_gcs.rb', line 15 def initialize super require "google/cloud/storage" Google::Apis.logger = log end |
Instance Method Details
#configure(conf) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_gcs.rb', line 73 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less." end # The customer-supplied, AES-256 encryption key that will be used to encrypt the file. @encryption_opts = { encryption_key: @encryption_key, } if = .map {|m| [m.key, m.value] }.to_h end @formatter = formatter_create @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end |
#format(tag, time, record) ⇒ Object
112 113 114 115 |
# File 'lib/fluent/plugin/out_gcs.rb', line 112 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end |
#multi_workers_ready? ⇒ Boolean
117 118 119 |
# File 'lib/fluent/plugin/out_gcs.rb', line 117 def multi_workers_ready? true end |
#start ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_gcs.rb', line 99 def start @gcs = Google::Cloud::Storage.new( project: @project, keyfile: @keyfile, retries: @client_retries, timeout: @client_timeout ) @gcs_bucket = @gcs.bucket(@bucket) ensure_bucket super end |
#write(chunk) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/out_gcs.rb', line 121 def write(chunk) path = generate_path(chunk) @object_creator.create(chunk) do |obj| opts = { metadata: , acl: @acl, storage_class: @storage_class, content_type: @object_creator.content_type, content_encoding: @object_creator.content_encoding, } opts.merge!(@encryption_opts) log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" } @gcs_bucket.upload_file(obj.path, path, **opts) end end |