Class: Fluent::S3Output
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::S3Output
show all
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb
Defined Under Namespace
Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor
Constant Summary
collapse
- MAX_HEX_RANDOM_LENGTH =
16
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of S3Output.
8
9
10
11
12
13
14
15
16
|
# File 'lib/fluent/plugin/out_s3.rb', line 8
def initialize
super
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
113
114
115
|
# File 'lib/fluent/plugin/out_s3.rb', line 113
def bucket
@bucket
end
|
Instance Method Details
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/fluent/plugin/out_s3.rb', line 122
def configure(conf)
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
rescue
$log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
@storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy
@values_for_s3_object_chunk = {}
end
|
#desc(description) ⇒ Object
21
22
|
# File 'lib/fluent/plugin/out_s3.rb', line 21
def desc(description)
end
|
179
180
181
|
# File 'lib/fluent/plugin/out_s3.rb', line 179
def format(tag, time, record)
@formatter.format(tag, time, record)
end
|
#placeholders ⇒ Object
118
119
120
|
# File 'lib/fluent/plugin/out_s3.rb', line 118
def placeholders
[:percent]
end
|
#start ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
# File 'lib/fluent/plugin/out_s3.rb', line 158
def start
super
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
@hex_random_n = (@hex_random_length + 1) / 2
end
|
#write(chunk) ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
# File 'lib/fluent/plugin/out_s3.rb', line 183
def write(chunk)
i = 0
previous_path = nil
begin
path = @path_slicer.call(@path)
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
put_options = {
:body => tmp,
:content_type => @compressor.content_type,
:storage_class => @storage_class,
}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:acl] = @acl if @acl
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
end
end
ensure
tmp.close(true) rescue nil
end
end
|