Class: Fluent::Plugin::S3Output

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb

Defined Under Namespace

Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
"out_file"
MAX_HEX_RANDOM_LENGTH =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Output

Returns a new instance of S3Output.



15
16
17
18
19
# File 'lib/fluent/plugin/out_s3.rb', line 15

def initialize
  super
  @compressor = nil
  @uuid_flush_enabled = false
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



137
138
139
# File 'lib/fluent/plugin/out_s3.rb', line 137

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/fluent/plugin/out_s3.rb', line 141

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)

  super

  Aws.use_bundled_cert! if @use_bundled_cert

  if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
    raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  begin
    buffer_type = @buffer_config[:@type]
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
  rescue
    log.warn "#{@store_as} not found. Use 'text' instead"
    @compressor = TextCompressor.new
  end
  @compressor.configure(conf)

  @formatter = formatter_create

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  unless @index_format =~ /^%(0\d*)?[dxX]$/
    raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types" 
  end

  if @reduced_redundancy
    log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
    @storage_class = "REDUCED_REDUNDANCY"
  end

  @s3_object_key_format = process_s3_object_key_format
  if !@check_object
    if conf.has_key?('s3_object_key_format')
      log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
    else
      log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{date_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
      @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
    end
  end

  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @values_for_s3_object_chunk = {}
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end

#format(tag, time, record) ⇒ Object



221
222
223
224
# File 'lib/fluent/plugin/out_s3.rb', line 221

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/fluent/plugin/out_s3.rb', line 193

def multi_workers_ready?
  true
end

#startObject



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/fluent/plugin/out_s3.rb', line 197

def start
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  options[:http_proxy] = @proxy_uri if @proxy_uri
  options[:force_path_style] = @force_path_style
  options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
  options[:signature_version] = @signature_version unless @signature_version.nil?
  options[:ssl_verify_peer] = @ssl_verify_peer
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  s3_client = Aws::S3::Client.new(options)
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  check_apikeys if @check_apikey_on_start
  ensure_bucket if @check_bucket

  super
end

#write(chunk) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/fluent/plugin/out_s3.rb', line 226

def write(chunk)
  i = 0
   = chunk.
  previous_path = nil
  time_slice = if .timekey.nil?
                 ''.freeze
               else
                 @time_slice_with_tz.call(.timekey)
               end

  if @check_object
    begin
      @values_for_s3_object_chunk[chunk.unique_id] ||= {
        "%{hex_random}" => hex_random(chunk),
      }
      values_for_s3_object_key_pre = {
        "%{path}" => @path,
        "%{file_extension}" => @compressor.ext,
      }
      values_for_s3_object_key_post = {
        "%{time_slice}" => time_slice,
        "%{index}" => sprintf(@index_format,i),
      }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
      values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

      s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
        values_for_s3_object_key_pre.fetch(matched_key, matched_key)
      end
      s3path = extract_placeholders(s3path, )
      s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
      if (i > 0) && (s3path == previous_path)
        if @overwrite
          log.warn "#{s3path} already exists, but will overwrite"
          break
        else
          raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
        end
      end

      i += 1
      previous_path = s3path
    end while @bucket.object(s3path).exists?
  else
    if @localtime
      hms_slicer = Time.now.strftime("%H%M%S")
    else
      hms_slicer = Time.now.utc.strftime("%H%M%S")
    end

    @values_for_s3_object_chunk[chunk.unique_id] ||= {
      "%{hex_random}" => hex_random(chunk),
    }
    values_for_s3_object_key_pre = {
      "%{path}" => @path,
      "%{file_extension}" => @compressor.ext,
    }
    values_for_s3_object_key_post = {
      "%{date_slice}" => time_slice,
      "%{hms_slice}" => hms_slicer,
    }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
    values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

    s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
      values_for_s3_object_key_pre.fetch(matched_key, matched_key)
    end
    s3path = extract_placeholders(s3path, )
    s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
  end

  tmp = Tempfile.new("s3-")
  tmp.binmode
  begin
    @compressor.compress(chunk, tmp)
    tmp.rewind
    log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.} to s3://#{@s3_bucket}/#{s3path}"

    put_options = {
      body: tmp,
      content_type: @compressor.content_type,
      storage_class: @storage_class,
    }
    put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
    put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
    put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
    put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
    put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
    put_options[:acl] = @acl if @acl
    put_options[:grant_full_control] = @grant_full_control if @grant_full_control
    put_options[:grant_read] = @grant_read if @grant_read
    put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
    put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp

    if @s3_metadata
      put_options[:metadata] = {}
      @s3_metadata.each do |k, v|
        put_options[:metadata][k] = extract_placeholders(v, ).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
      end
    end
    @bucket.object(s3path).put(put_options)

    @values_for_s3_object_chunk.delete(chunk.unique_id)

    if @warn_for_delay
      if Time.at(chunk..timekey) < Time.now - @warn_for_delay
        log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
      end
    end
  ensure
    tmp.close(true) rescue nil
  end
end