Class: LogStash::Inputs::SQSS3

Inherits:
Threadable
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/sqs_s3.rb

Overview

Get logs from AWS s3 buckets as issued by an object-created event via sqs.

This plugin is based on the logstash-input-sqs plugin but doesn’t log the sqs event itself. Instead it assumes, that the event is an s3 object-created event and will then download and process the given file.

Some issues of logstash-input-sqs, like logstash not shutting down properly, have been fixed for this plugin.

In contrast to logstash-input-sqs this plugin uses the “Receive Message Wait Time” configured for the sqs queue in question, a good value will be something like 10 seconds to ensure a reasonable shutdown time of logstash. Also use a “Default Visibility Timeout” that is high enough for log files to be downloaded and processed (I think a good value should be 5-10 minutes for most use cases), the plugin will avoid removing the event from the queue if the associated log file couldn’t be correctly passed to the processing level of logstash (e.g. downloaded content size doesn’t match sqs event).

This plugin is meant for high availability setups, in contrast to logstash-input-s3 you can safely use multiple logstash nodes, since the usage of sqs will ensure that each logfile is processed only once and no file will get lost on node failure or downscaling for auto-scaling groups. (You should use a “Message Retention Period” >= 4 days for your sqs to ensure you can survive a weekend of faulty log file processing) The plugin will not delete objects from s3 buckets, so make sure to have a reasonable “Lifecycle” configured for your buckets, which should keep the files at least “Message Retention Period” days.

A typical setup will contain some s3 buckets containing elb, cloudtrail or other log files. These will be configured to send object-created events to a sqs queue, which will be configured as the source queue for this plugin. (The plugin supports gzipped content if it is marked with “contend-encoding: gzip” as it is the case for cloudtrail logs)

The logstash node therefore must have sqs permissions + the permissions to download objects from the s3 buckets that send events to the queue. (If logstash nodes are running on EC2 you should use a ServerRole to provide permissions)

source,json

{

"Version": "2012-10-17",
"Statement": [
    {
        "Effect": "Allow",
        "Action": [
            "sqs:Get*",
            "sqs:List*",
            "sqs:ReceiveMessage",
            "sqs:ChangeMessageVisibility*",
            "sqs:DeleteMessage*"
        ],
        "Resource": [
            "arn:aws:sqs:us-east-1:123456789012:my-elb-log-queue"
        ]
    },
    {
        "Effect": "Allow",
        "Action": [
            "s3:Get*",
            "s3:List*"
        ],
        "Resource": [
            "arn:aws:s3:::my-elb-logs",
            "arn:aws:s3:::my-elb-logs/*"
        ]
    }
]

}

Constant Summary collapse

BACKOFF_SLEEP_TIME =
1
BACKOFF_FACTOR =
2
MAX_TIME_BEFORE_GIVING_UP =
60
EVENT_SOURCE =
'aws:s3'
EVENT_TYPE =
'ObjectCreated'
MAX_MESSAGES_TO_FETCH =

Between 1-10 in the AWS-SDK doc

10
SENT_TIMESTAMP =
"SentTimestamp"
SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
SKIP_DELETE =
false

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pollerObject (readonly)

Returns the value of attribute poller.



123
124
125
# File 'lib/logstash/inputs/sqs_s3.rb', line 123

def poller
  @poller
end

#s3Object (readonly)

Returns the value of attribute s3.



124
125
126
# File 'lib/logstash/inputs/sqs_s3.rb', line 124

def s3
  @s3
end

Instance Method Details

#handle_message(message, queue) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/logstash/inputs/sqs_s3.rb', line 158

def handle_message(message, queue)
  hash = JSON.parse message.body
  # there may be test events sent from the s3 bucket which won't contain a Records array,
  # we will skip those events and remove them from queue
  if hash['Records'] then
    # typically there will be only 1 record per event, but since it is an array we will
    # treat it as if there could be more records
    hash['Records'].each do |record|
      # in case there are any events with Records that aren't s3 object-created events and can't therefore be
      # processed by this plugin, we will skip them and remove them from queue
      if record['eventSource'] == EVENT_SOURCE and record['eventName'].start_with?(EVENT_TYPE) then
        # try download and :skip_delete if it fails
        begin
          response = @s3.get_object(
            bucket: record['s3']['bucket']['name'],
            key: record['s3']['object']['key']
          )
        rescue => e
          @logger.warn("issuing :skip_delete on failed download", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e)
          throw :skip_delete
        end
        # verify downloaded content size
        if response.content_length == record['s3']['object']['size'] then
          body = response.body
          # if necessary unzip. Note: Firehose is automatically gzipped but does NOT include the content encoding or the extension.
          if response.content_encoding == "gzip" or record['s3']['object']['key'].end_with?(".gz") or record['s3']['object']['key'].include?("/firehose/") then
            begin
       temp = MultipleFilesGzipReader.new(body)
            rescue => e
              @logger.warn("content is marked to be gzipped but can't unzip it, assuming plain text", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e)
              temp = body
            end
            body = temp
          end
          # process the plain text content
          begin
     # assess currently running load (in MB)
            @current_load += (record['s3']['object']['size'].to_f / 1000000)

     if record['s3']['object']['key'].include?("/firehose/") then
               lines = body.read.encode('UTF-8', 'binary').gsub("}{", "}\n{").split(/\n/)
            else
               lines = body.read.encode('UTF-8', 'binary', invalid: :replace, undef: :replace, replace: "\u2370").split(/\n/)
            end

     # Set the codec to json if required, otherwise the default is plain text. Firehose is always in JSON format
            if response.content_type == "application/json" or record['s3']['object']['key'].include?("/firehose/") then
              @codec = @jsonCodec

if response.content_encoding != "gzip" then
  # If it's json in plain text, need to remove whitespaces
  # TODO...
end
            else
              @codec = @plainCodec
#lines = lines.split(/\n/)
            end

            lines.each do |line|
              @codec.decode(line) do |event|
                decorate(event)

                event.set('[@metadata][s3_bucket_name]', record['s3']['bucket']['name'])
                event.set('[@metadata][s3_object_key]', record['s3']['object']['key'])
  event.set('[@metadata][event_type]', 's3')
  event.set('[@metadata][s3_object_encoding]', response.content_encoding)
                event.set('[@metadata][s3_object_type]', response.content_type)

                queue << event
              end
            end

     event = LogStash::Event.new()
            event.set('[@metadata][event_type]', 'complete')
     event.set('[@metadata][' + @receipt_handle + ']', message.receipt_handle) if @receipt_handle
            event.set('[@metadata][' + @message_id + ']', message.message_id) if @message_id
     event.set('[@metadata][s3_object_key]', record['s3']['object']['key'])
            event.set('[@metadata][' + @sent_timestamp_field + ']', convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field

     queue << event
          rescue => e
            @logger.warn("issuing :skip_delete on failed plain text processing", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e)
            throw :skip_delete
          end
        # otherwise try again later
        else
          @logger.warn("issuing :skip_delete on wrong download content size", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'],
            :download_size => response.content_length, :expected => record['s3']['object']['size'])
          throw :skip_delete
        end
      end
    end
  end
end

#polling_optionsObject



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/logstash/inputs/sqs_s3.rb', line 146

def polling_options
  {
    # the number of messages to fetch in a single api call
    :max_number_of_messages => MAX_MESSAGES_TO_FETCH,
    :attribute_names => SQS_ATTRIBUTES,
    # we will use the queue's setting, a good value is 10 seconds
    # (to ensure fast logstash shutdown on the one hand and few api calls on the other hand)
    :wait_time_seconds => nil,
    :skip_delete => @skip_delete
  }
end

#registerObject



126
127
128
129
130
131
132
133
134
# File 'lib/logstash/inputs/sqs_s3.rb', line 126

def register
  require "aws-sdk"
  @logger.info("Registering SQS input", :queue => @queue)
  @logger.info("Skip Delete", :skip_delete => @skip_delete)
  @current_load = 0.0
  @jsonCodec = LogStash::Codecs::JSON.new
  @plainCodec = LogStash::Codecs::Plain.new
  setup_queue
end

#run(queue) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/logstash/inputs/sqs_s3.rb', line 253

def run(queue)
  # ensure we can stop logstash correctly
  poller.before_request do |stats|
    if stop? then
      @logger.warn("issuing :stop_polling on stop?", :queue => @queue)
      # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized
      throw :stop_polling
    end

    # Throttle requests is overloaded by big files
    if @current_load > @max_load_before_throttling/1000000 then
	throttle_seconds_sleep = @seconds_to_throttle * (@current_load / (@max_load_before_throttling.to_f/1000000)).floor
      @logger.warn("**********Current load has exceeded " + (@max_load_before_throttling.to_f/1000000).to_s + " MB. Load is currently: " + @current_load.to_s + ". Throttling back by " + throttle_seconds_sleep.to_s)

	# Cap the throttle time to 1 min
      if(throttle_seconds_sleep != 0) then
 if(throttle_seconds_sleep > 60) then
          sleep(60)
 else
   sleep(throttle_seconds_sleep)
        end
      end
    end

    # Reset load to 0
    @current_load = 0.0
  end
  # poll a message and process it
  run_with_backoff do
    poller.poll(polling_options) do |messages|
      messages.each do |message|
        handle_message(message, queue)
	end
    end
  end
end

#setup_queueObject



136
137
138
139
140
141
142
143
144
# File 'lib/logstash/inputs/sqs_s3.rb', line 136

def setup_queue
  aws_sqs_client = Aws::SQS::Client.new(aws_options_hash)
  queue_url = aws_sqs_client.get_queue_url(:queue_name =>  @queue)[:queue_url]
  @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client)
  @s3 = Aws::S3::Client.new(aws_options_hash)
rescue Aws::SQS::Errors::ServiceError => e
  @logger.error("Cannot establish connection to Amazon SQS", :error => e)
  raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
end