Class: Fluent::S3Output

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb

Defined Under Namespace

Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor

Constant Summary collapse

MAX_HEX_RANDOM_LENGTH =
16

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Output

Returns a new instance of S3Output.



8
9
10
11
12
13
14
15
16
17
# File 'lib/fluent/plugin/out_s3.rb', line 8

def initialize
  super
  require 'aws-sdk-resources'
  require 'zlib'
  require 'time'
  require 'tempfile'

  @compressor = nil
  @uuid_flush_enabled = false
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



128
129
130
# File 'lib/fluent/plugin/out_s3.rb', line 128

def bucket
  @bucket
end

Class Method Details

.register_compressor(name, compressor) ⇒ Object



496
497
498
# File 'lib/fluent/plugin/out_s3.rb', line 496

def self.register_compressor(name, compressor)
  COMPRESSOR_REGISTRY.register(name, compressor)
end

Instance Method Details

#configure(conf) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/out_s3.rb', line 132

def configure(conf)
  super

  if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
    raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  begin
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
  rescue
    $log.warn "#{@store_as} not found. Use 'text' instead"
    @compressor = TextCompressor.new
  end
  @compressor.configure(conf)

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)

  if @localtime
    @path_slicer = Proc.new {|path|
      Time.now.strftime(path)
    }
  else
    @path_slicer = Proc.new {|path|
      Time.now.utc.strftime(path)
    }
  end

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  if @reduced_redundancy
    $log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
    @storage_class = "REDUCED_REDUNDANCY"
  end

  @path = process_deprecated_placeholders(@path)
  @s3_object_key_format = process_deprecated_placeholders(@s3_object_key_format)
  @values_for_s3_object_chunk = {}
end

#desc(description) ⇒ Object



22
23
# File 'lib/fluent/plugin/out_s3.rb', line 22

def desc(description)
end

#format(tag, time, record) ⇒ Object



202
203
204
# File 'lib/fluent/plugin/out_s3.rb', line 202

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#startObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/out_s3.rb', line 174

def start
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  options[:http_proxy] = @proxy_uri if @proxy_uri
  options[:force_path_style] = @force_path_style
  options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
  options[:signature_version] = @signature_version unless @signature_version.nil?
  options[:ssl_verify_peer] = @ssl_verify_peer
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  s3_client = Aws::S3::Client.new(options)
  @s3 = Aws::S3::Resource.new(:client => s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  check_apikeys if @check_apikey_on_start
  ensure_bucket if @check_bucket

  if !@check_object
    @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
  end

  super
end

#write(chunk) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/fluent/plugin/out_s3.rb', line 206

def write(chunk)
  i = 0
  previous_path = nil

  if @check_object
    begin
      path = @path_slicer.call(@path)

      @values_for_s3_object_chunk[chunk.unique_id] ||= {
        "hex_random" => hex_random(chunk),
      }
      values_for_s3_object_key = {
        "path" => path,
        "time_slice" => chunk.key,
        "file_extension" => @compressor.ext,
        "index" => i,
      }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
      values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled

      s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
        values_for_s3_object_key[expr[2...expr.size-1]]
      }
      if (i > 0) && (s3path == previous_path)
        if @overwrite
          log.warn "#{s3path} already exists, but will overwrite"
          break
        else
          raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
        end
      end

      i += 1
      previous_path = s3path
    end while @bucket.object(s3path).exists?
  else
    if @localtime
      hms_slicer = Time.now.strftime("%H%M%S")
    else
      hms_slicer = Time.now.utc.strftime("%H%M%S")
    end

    values_for_s3_object_key = {
      "path" => @path_slicer.call(@path),
      "date_slice" => chunk.key,
      "file_extension" => @compressor.ext,
      "hms_slice" => hms_slicer,
    }
    s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
      values_for_s3_object_key[expr[2...expr.size-1]]
    }
  end

  tmp = Tempfile.new("s3-", @tmp_dir)
  tmp.binmode
  begin
    @compressor.compress(chunk, tmp)
    tmp.rewind
    log.debug { "out_s3: write chunk: {key:#{chunk.key},unique_id:#{unique_hex(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }

    put_options = {
      :body => tmp,
      :content_type => @compressor.content_type,
      :storage_class => @storage_class,
    }
    put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
    put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
    put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
    put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
    put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
    put_options[:acl] = @acl if @acl
    @bucket.object(s3path).put(put_options)

    @values_for_s3_object_chunk.delete(chunk.unique_id)

    if @warn_for_delay
      if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
        log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
      end
    end
  ensure
    tmp.close(true) rescue nil
  end
end