Class: LogStash::Outputs::S3
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::S3
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/outputs/s3.rb,
lib/logstash/outputs/s3/uploader.rb,
lib/logstash/outputs/s3/path_validator.rb,
lib/logstash/outputs/s3/temporary_file.rb,
lib/logstash/outputs/s3/file_repository.rb,
lib/logstash/outputs/s3/size_rotation_policy.rb,
lib/logstash/outputs/s3/time_rotation_policy.rb,
lib/logstash/outputs/s3/temporary_file_factory.rb,
lib/logstash/outputs/s3/writable_directory_validator.rb,
lib/logstash/outputs/s3/size_and_time_rotation_policy.rb,
lib/logstash/outputs/s3/write_bucket_permission_validator.rb
Overview
INFORMATION:
This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3).
Requirements:
-
Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key)
-
S3 PutObject permission
S3 outputs create temporary files into the OS’ temporary directory, you can specify where to save them using the ‘temporary_directory` option.
S3 output files have the following format
ls.s3.312bc026-2f5d-49bc-ae9f-5940cf4ad9a6.2013-04-18T10.00.tag_hello.part0.txt
|======= | ls.s3 | indicate logstash plugin s3 | | 312bc026-2f5d-49bc-ae9f-5940cf4ad9a6 | a new, random uuid per file. | | 2013-04-18T10.00 | represents the time whenever you specify time_file. | | tag_hello | this indicates the event’s tag. | | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | |=======
Crash Recovery:
-
This plugin will recover and upload temporary log files after crash/abnormal termination when using ‘restore` set to true
- Note regarding time_file and size_file
-
:
Both time_file and size_file settings can trigger a log “file rotation” A log rotation pushes the current log “part” to s3 and deleted from local temporary storage.
If you specify BOTH size_file and time_file then it will create file for each tag (if specified). When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.
If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. When time_file minutes elapses, a log rotation will be triggered.
If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. When size of log file part > size_file, a log rotation will be triggered.
If NEITHER size_file nor time_file is specified, ONLY one file for each tag (if specified) will be created. WARNING: Since no log rotation is triggered, S3 Upload will only occur when logstash restarts.
#### Usage: This is an example of logstash config:
- source,ruby
-
output {
s3{ access_key_id => "crazy_key" (required) secret_access_key => "monkey_access_key" (required) region => "eu-west-1" (optional, default = "us-east-1") bucket => "your_bucket" (required) size_file => 2048 (optional) - Bytes time_file => 5 (optional) - Minutes codec => "plain" (optional) canned_acl => "private" (optional. Options are "private", "public-read", "public-read-write", "authenticated-read", "aws-exec-read", "bucket-owner-read", "bucket-owner-full-control", "log-delivery-write". Defaults to "private" ) }
Defined Under Namespace
Classes: FileRepository, PathValidator, SizeAndTimeRotationPolicy, SizeRotationPolicy, TemporaryFile, TemporaryFileFactory, TimeRotationPolicy, Uploader, WritableDirectoryValidator, WriteBucketPermissionValidator
Constant Summary collapse
- PREFIX_KEY_NORMALIZE_CHARACTER =
"_"- PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15- CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({ :min_threads => 1, :max_threads => 2, :fallback_policy => :caller_runs })
Instance Method Summary collapse
- #close ⇒ Object
- #full_options ⇒ Object
- #multi_receive_encoded(events_and_encoded) ⇒ Object
- #normalize_key(prefix_key) ⇒ Object
- #register ⇒ Object
-
#restore ⇒ Object
IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”.
- #symbolize_keys(hash) ⇒ Object
- #symbolized_settings ⇒ Object
- #upload_options ⇒ Object
Instance Method Details
#close ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/logstash/outputs/s3.rb', line 251 def close stop_periodic_check if @rotation.needs_periodic? @logger.debug("Uploading current workspace") # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to S3. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end |
#full_options ⇒ Object
271 272 273 274 275 |
# File 'lib/logstash/outputs/s3.rb', line 271 def = || {} [:signature_version] = @signature_version if @signature_version symbolized_settings.merge() end |
#multi_receive_encoded(events_and_encoded) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/logstash/outputs/s3.rb', line 230 def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end |
#normalize_key(prefix_key) ⇒ Object
289 290 291 |
# File 'lib/logstash/outputs/s3.rb', line 289 def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end |
#register ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/logstash/outputs/s3.rb', line 188 def register # I've move the validation of the items into custom classes # to prepare for the new config validation that will be part of the core so the core can # be moved easily. unless @prefix.empty? if !PathValidator.valid?(prefix) raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end if !WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" end if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.new(@logger).valid?(bucket_resource, ) raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check your credentials or your permissions." end if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0 raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0" end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1, :max_threads => @upload_workers_count, :max_queue => @upload_queue_size, :fallback_policy => :caller_runs }) @uploader = Uploader.new(bucket_resource, @logger, executor) # Restoring from crash will use a new threadpool to slowly recover # New events should have more priority. restore_from_crash if @restore # If we need time based rotation we need to do periodic check on the file # to take care of file that were not updated recently start_periodic_check if @rotation.needs_periodic? end |
#restore ⇒ Object
IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.
126 |
# File 'lib/logstash/outputs/s3.rb', line 126 config :restore, :validate => :boolean, :default => true |
#symbolize_keys(hash) ⇒ Object
281 282 283 284 285 286 |
# File 'lib/logstash/outputs/s3.rb', line 281 def symbolize_keys(hash) return hash unless hash.is_a?(Hash) symbolized = {} hash.each { |key, value| symbolized[key.to_sym] = symbolize_keys(value) } symbolized end |
#symbolized_settings ⇒ Object
277 278 279 |
# File 'lib/logstash/outputs/s3.rb', line 277 def symbolized_settings @symbolized_settings ||= symbolize_keys(@additional_settings) end |
#upload_options ⇒ Object
293 294 295 296 297 298 299 300 301 |
# File 'lib/logstash/outputs/s3.rb', line 293 def { :acl => @canned_acl, :server_side_encryption => @server_side_encryption ? @server_side_encryption_algorithm : nil, :ssekms_key_id => @server_side_encryption_algorithm == "aws:kms" ? @ssekms_key_id : nil, :storage_class => @storage_class, :content_encoding => @encoding == "gzip" ? "gzip" : nil } end |