Class: Fluent::S3Output
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::S3Output
show all
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb
Defined Under Namespace
Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, TextCompressor
Constant Summary
collapse
- MAX_HEX_RANDOM_LENGTH =
16
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
8
9
10
11
12
13
14
15
16
|
# File 'lib/fluent/plugin/out_s3.rb', line 8
def initialize
super
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
109
110
111
|
# File 'lib/fluent/plugin/out_s3.rb', line 109
def bucket
@bucket
end
|
Class Method Details
.register_compressor(name, compressor) ⇒ Object
403
404
405
|
# File 'lib/fluent/plugin/out_s3.rb', line 403
def self.register_compressor(name, compressor)
COMPRESSOR_REGISTRY.register(name, compressor)
end
|
Instance Method Details
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/fluent/plugin/out_s3.rb', line 118
def configure(conf)
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
rescue
$log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
@storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy
@values_for_s3_object_chunk = {}
end
|
#desc(description) ⇒ Object
21
22
|
# File 'lib/fluent/plugin/out_s3.rb', line 21
def desc(description)
end
|
174
175
176
|
# File 'lib/fluent/plugin/out_s3.rb', line 174
def format(tag, time, record)
@formatter.format(tag, time, record)
end
|
#placeholders ⇒ Object
114
115
116
|
# File 'lib/fluent/plugin/out_s3.rb', line 114
def placeholders
[:percent]
end
|
#start ⇒ Object
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/fluent/plugin/out_s3.rb', line 154
def start
super
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
@hex_random_n = (@hex_random_length + 1) / 2
end
|
#write(chunk) ⇒ Object
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
# File 'lib/fluent/plugin/out_s3.rb', line 178
def write(chunk)
i = 0
previous_path = nil
begin
path = @path_slicer.call(@path)
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
put_options = {:body => tmp, :content_type => @compressor.content_type, :storage_class => @storage_class}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
ensure
tmp.close(true) rescue nil
end
end
|