Class: Fluent::S3Output
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::S3Output
show all
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb
Defined Under Namespace
Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor
Constant Summary
collapse
- MAX_HEX_RANDOM_LENGTH =
16
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of S3Output.
8
9
10
11
12
13
14
15
16
|
# File 'lib/fluent/plugin/out_s3.rb', line 8
def initialize
super
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
119
120
121
|
# File 'lib/fluent/plugin/out_s3.rb', line 119
def bucket
@bucket
end
|
Instance Method Details
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/fluent/plugin/out_s3.rb', line 128
def configure(conf)
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
rescue
$log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
if @reduced_redundancy
$log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
@values_for_s3_object_chunk = {}
end
|
#desc(description) ⇒ Object
21
22
|
# File 'lib/fluent/plugin/out_s3.rb', line 21
def desc(description)
end
|
188
189
190
|
# File 'lib/fluent/plugin/out_s3.rb', line 188
def format(tag, time, record)
@formatter.format(tag, time, record)
end
|
#placeholders ⇒ Object
124
125
126
|
# File 'lib/fluent/plugin/out_s3.rb', line 124
def placeholders
[:percent]
end
|
#start ⇒ Object
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/fluent/plugin/out_s3.rb', line 167
def start
super
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
@hex_random_n = (@hex_random_length + 1) / 2
end
|
#write(chunk) ⇒ Object
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
# File 'lib/fluent/plugin/out_s3.rb', line 192
def write(chunk)
i = 0
previous_path = nil
begin
path = @path_slicer.call(@path)
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
put_options = {
:body => tmp,
:content_type => @compressor.content_type,
:storage_class => @storage_class,
}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
put_options[:acl] = @acl if @acl
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
end
end
ensure
tmp.close(true) rescue nil
end
end
|