Class: LogStash::Inputs::SQSS3
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::SQSS3
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/sqs_s3.rb
Overview
Get logs from AWS s3 buckets as issued by an object-created event via sqs.
This plugin is based on the logstash-input-sqs plugin but doesn’t log the sqs event itself. Instead it assumes, that the event is an s3 object-created event and will then download and process the given file.
Some issues of logstash-input-sqs, like logstash not shutting down properly, have been fixed for this plugin.
In contrast to logstash-input-sqs this plugin uses the “Receive Message Wait Time” configured for the sqs queue in question, a good value will be something like 10 seconds to ensure a reasonable shutdown time of logstash. Also use a “Default Visibility Timeout” that is high enough for log files to be downloaded and processed (I think a good value should be 5-10 minutes for most use cases), the plugin will avoid removing the event from the queue if the associated log file couldn’t be correctly passed to the processing level of logstash (e.g. downloaded content size doesn’t match sqs event).
This plugin is meant for high availability setups, in contrast to logstash-input-s3 you can safely use multiple logstash nodes, since the usage of sqs will ensure that each logfile is processed only once and no file will get lost on node failure or downscaling for auto-scaling groups. (You should use a “Message Retention Period” >= 4 days for your sqs to ensure you can survive a weekend of faulty log file processing) The plugin will not delete objects from s3 buckets, so make sure to have a reasonable “Lifecycle” configured for your buckets, which should keep the files at least “Message Retention Period” days.
A typical setup will contain some s3 buckets containing elb, cloudtrail or other log files. These will be configured to send object-created events to a sqs queue, which will be configured as the source queue for this plugin. (The plugin supports gzipped content if it is marked with “contend-encoding: gzip” as it is the case for cloudtrail logs)
The logstash node therefore must have sqs permissions + the permissions to download objects from the s3 buckets that send events to the queue. (If logstash nodes are running on EC2 you should use a ServerRole to provide permissions)
- source,json
-
{
"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:Get*", "sqs:List*", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility*", "sqs:DeleteMessage*" ], "Resource": [ "arn:aws:sqs:us-east-1:123456789012:my-elb-log-queue" ] }, { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": [ "arn:aws:s3:::my-elb-logs", "arn:aws:s3:::my-elb-logs/*" ] } ]
}
Constant Summary collapse
- BACKOFF_SLEEP_TIME =
1
- BACKOFF_FACTOR =
2
- MAX_TIME_BEFORE_GIVING_UP =
60
- EVENT_SOURCE =
'aws:s3'
- EVENT_TYPE =
'ObjectCreated'
- MAX_MESSAGES_TO_FETCH =
Between 1-10 in the AWS-SDK doc
10
- SENT_TIMESTAMP =
"SentTimestamp"
- SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
- SKIP_DELETE =
false
Instance Attribute Summary collapse
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
-
#s3 ⇒ Object
readonly
Returns the value of attribute s3.
Instance Method Summary collapse
- #handle_message(message, queue) ⇒ Object
- #polling_options ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #setup_queue ⇒ Object
Instance Attribute Details
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
123 124 125 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 123 def poller @poller end |
#s3 ⇒ Object (readonly)
Returns the value of attribute s3.
124 125 126 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 124 def s3 @s3 end |
Instance Method Details
#handle_message(message, queue) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 158 def (, queue) hash = JSON.parse .body # there may be test events sent from the s3 bucket which won't contain a Records array, # we will skip those events and remove them from queue if hash['Records'] then # typically there will be only 1 record per event, but since it is an array we will # treat it as if there could be more records hash['Records'].each do |record| # in case there are any events with Records that aren't s3 object-created events and can't therefore be # processed by this plugin, we will skip them and remove them from queue if record['eventSource'] == EVENT_SOURCE and record['eventName'].start_with?(EVENT_TYPE) then # try download and :skip_delete if it fails begin response = @s3.get_object( bucket: record['s3']['bucket']['name'], key: record['s3']['object']['key'] ) rescue => e @logger.warn("issuing :skip_delete on failed download", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e) throw :skip_delete end # verify downloaded content size if response.content_length == record['s3']['object']['size'] then body = response.body # if necessary unzip. Note: Firehose is automatically gzipped but does NOT include the content encoding or the extension. if response.content_encoding == "gzip" or record['s3']['object']['key'].end_with?(".gz") or record['s3']['object']['key'].include?("/firehose/") then begin temp = MultipleFilesGzipReader.new(body) rescue => e @logger.warn("content is marked to be gzipped but can't unzip it, assuming plain text", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e) temp = body end body = temp end # process the plain text content begin # assess currently running load (in MB) @current_load += (record['s3']['object']['size'].to_f / 1000000) if record['s3']['object']['key'].include?("/firehose/") then lines = body.read.encode('UTF-8', 'binary').gsub("}{", "}\n{").split(/\n/) else lines = body.read.encode('UTF-8', 'binary', invalid: :replace, undef: :replace, replace: "\u2370").split(/\n/) end # Set the codec to json if required, otherwise the default is plain text. Firehose is always in JSON format if response.content_type == "application/json" or record['s3']['object']['key'].include?("/firehose/") then @codec = @jsonCodec if response.content_encoding != "gzip" then # If it's json in plain text, need to remove whitespaces # TODO... end else @codec = @plainCodec #lines = lines.split(/\n/) end lines.each do |line| @codec.decode(line) do |event| decorate(event) event.set('[@metadata][s3_bucket_name]', record['s3']['bucket']['name']) event.set('[@metadata][s3_object_key]', record['s3']['object']['key']) event.set('[@metadata][event_type]', 's3') event.set('[@metadata][s3_object_encoding]', response.content_encoding) event.set('[@metadata][s3_object_type]', response.content_type) queue << event end end event = LogStash::Event.new() event.set('[@metadata][event_type]', 'complete') event.set('[@metadata][' + @receipt_handle + ']', .receipt_handle) if @receipt_handle event.set('[@metadata][' + @message_id + ']', .) if @message_id event.set('[@metadata][s3_object_key]', record['s3']['object']['key']) event.set('[@metadata][' + @sent_timestamp_field + ']', (.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field queue << event rescue => e @logger.warn("issuing :skip_delete on failed plain text processing", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e) throw :skip_delete end # otherwise try again later else @logger.warn("issuing :skip_delete on wrong download content size", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :download_size => response.content_length, :expected => record['s3']['object']['size']) throw :skip_delete end end end end end |
#polling_options ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 146 def { # the number of messages to fetch in a single api call :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, # we will use the queue's setting, a good value is 10 seconds # (to ensure fast logstash shutdown on the one hand and few api calls on the other hand) :wait_time_seconds => nil, :skip_delete => @skip_delete } end |
#register ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 126 def register require "aws-sdk" @logger.info("Registering SQS input", :queue => @queue) @logger.info("Skip Delete", :skip_delete => @skip_delete) @current_load = 0.0 @jsonCodec = LogStash::Codecs::JSON.new @plainCodec = LogStash::Codecs::Plain.new setup_queue end |
#run(queue) ⇒ Object
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 253 def run(queue) # ensure we can stop logstash correctly poller.before_request do |stats| if stop? then @logger.warn("issuing :stop_polling on stop?", :queue => @queue) # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized throw :stop_polling end # Throttle requests is overloaded by big files if @current_load > @max_load_before_throttling/1000000 then throttle_seconds_sleep = @seconds_to_throttle * (@current_load / (@max_load_before_throttling.to_f/1000000)).floor @logger.warn("**********Current load has exceeded " + (@max_load_before_throttling.to_f/1000000).to_s + " MB. Load is currently: " + @current_load.to_s + ". Throttling back by " + throttle_seconds_sleep.to_s) # Cap the throttle time to 1 min if(throttle_seconds_sleep != 0) then if(throttle_seconds_sleep > 60) then sleep(60) else sleep(throttle_seconds_sleep) end end end # Reset load to 0 @current_load = 0.0 end # poll a message and process it run_with_backoff do poller.poll() do || .each do || (, queue) end end end end |
#setup_queue ⇒ Object
136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/inputs/sqs_s3.rb', line 136 def setup_queue aws_sqs_client = Aws::SQS::Client.new() queue_url = aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url] @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client) @s3 = Aws::S3::Client.new() rescue Aws::SQS::Errors::ServiceError => e @logger.error("Cannot establish connection to Amazon SQS", :error => e) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end |