Class: Fluent::S3Output
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::S3Output
show all
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb
Defined Under Namespace
Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor
Constant Summary
collapse
- MAX_HEX_RANDOM_LENGTH =
16
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
9
10
11
12
13
14
15
16
17
|
# File 'lib/fluent/plugin/out_s3.rb', line 9
def initialize
super
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
120
121
122
|
# File 'lib/fluent/plugin/out_s3.rb', line 120
def bucket
@bucket
end
|
Class Method Details
.register_compressor(name, compressor) ⇒ Object
421
422
423
|
# File 'lib/fluent/plugin/out_s3.rb', line 421
def self.register_compressor(name, compressor)
COMPRESSOR_REGISTRY.register(name, compressor)
end
|
Instance Method Details
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# File 'lib/fluent/plugin/out_s3.rb', line 129
def configure(conf)
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
rescue
$log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
if @reduced_redundancy
$log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
@values_for_s3_object_chunk = {}
end
|
#desc(description) ⇒ Object
22
23
|
# File 'lib/fluent/plugin/out_s3.rb', line 22
def desc(description)
end
|
187
188
189
|
# File 'lib/fluent/plugin/out_s3.rb', line 187
def format(tag, time, record)
@formatter.format(tag, time, record)
end
|
#placeholders ⇒ Object
125
126
127
|
# File 'lib/fluent/plugin/out_s3.rb', line 125
def placeholders
[:percent]
end
|
#start ⇒ Object
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
# File 'lib/fluent/plugin/out_s3.rb', line 168
def start
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
super
end
|
#write(chunk) ⇒ Object
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
# File 'lib/fluent/plugin/out_s3.rb', line 191
def write(chunk)
i = 0
previous_path = nil
begin
path = @path_slicer.call(@path)
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
put_options = {
:body => tmp,
:content_type => @compressor.content_type,
:storage_class => @storage_class,
}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
put_options[:acl] = @acl if @acl
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
end
end
ensure
tmp.close(true) rescue nil
end
end
|