Class: LogStash::Inputs::SQS

Inherits:
Threadable
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/sqs.rb

Overview

Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.

The “consumer” identity must have the following permissions on the queue:

* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

source,json

{

"Statement": [
  {
    "Action": [
      "sqs:ChangeMessageVisibility",
      "sqs:ChangeMessageVisibilityBatch",
      "sqs:GetQueueAttributes",
      "sqs:GetQueueUrl",
      "sqs:ListQueues",
      "sqs:SendMessage",
      "sqs:SendMessageBatch"
    ],
    "Effect": "Allow",
    "Resource": [
      "arn:aws:sqs:us-east-1:123456789012:Logstash"
    ]
  }
]

}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Constant Summary collapse

MAX_TIME_BEFORE_GIVING_UP =
60
MAX_MESSAGES_TO_FETCH =

Between 1-10 in the AWS-SDK doc

10
SENT_TIMESTAMP =
"SentTimestamp"
SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
BACKOFF_SLEEP_TIME =
1
BACKOFF_FACTOR =
2
DEFAULT_POLLING_FREQUENCY =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pollerObject (readonly)

Returns the value of attribute poller.



98
99
100
# File 'lib/logstash/inputs/sqs.rb', line 98

def poller
  @poller
end

Instance Method Details

#add_sqs_data(event, message) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/logstash/inputs/sqs.rb', line 130

def add_sqs_data(event, message)
  event.set(@id_field, message.message_id) if @id_field
  event.set(@md5_field, message.md5_of_body) if @md5_field
  event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field

  return event
end

#decode_event(message) ⇒ Object



124
125
126
127
128
# File 'lib/logstash/inputs/sqs.rb', line 124

def decode_event(message)
  @codec.decode(message.body) do |event|
    return event
  end
end

#handle_message(message) ⇒ Object



138
139
140
141
142
143
# File 'lib/logstash/inputs/sqs.rb', line 138

def handle_message(message)
  event = decode_event(message)
  add_sqs_data(event, message)
  decorate(event)
  return event
end

#polling_optionsObject



116
117
118
119
120
121
122
# File 'lib/logstash/inputs/sqs.rb', line 116

def polling_options
  { 
    :max_number_of_messages => MAX_MESSAGES_TO_FETCH,
    :attribute_names => SQS_ATTRIBUTES,
    :wait_time_seconds => @polling_frequency
  }
end

#registerObject



100
101
102
103
104
105
# File 'lib/logstash/inputs/sqs.rb', line 100

def register
  require "aws-sdk"
  @logger.info("Registering SQS input", :queue => @queue)

  setup_queue
end

#run(output_queue) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/logstash/inputs/sqs.rb', line 145

def run(output_queue)
  @logger.debug("Polling SQS queue", :polling_options => polling_options)

  run_with_backoff do
    poller.poll(polling_options) do |messages, stats|
      break if stop?

      messages.each do |message|
        output_queue << handle_message(message)
      end

      @logger.debug("SQS Stats:", :request_count => stats.request_count,
                    :received_message_count => stats.received_message_count,
                    :last_message_received_at => stats.last_message_received_at) if @logger.debug?
    end
  end
end

#setup_queueObject



107
108
109
110
111
112
113
114
# File 'lib/logstash/inputs/sqs.rb', line 107

def setup_queue
  aws_sqs_client = Aws::SQS::Client.new(aws_options_hash)
  queue_url = aws_sqs_client.get_queue_url(:queue_name =>  @queue)[:queue_url]
  @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client)
rescue Aws::SQS::Errors::ServiceError => e
  @logger.error("Cannot establish connection to Amazon SQS", :error => e)
  raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
end