Class: LogStash::Inputs::SQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::SQS
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/sqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume from the queue.
The “consumer” identity must have the following permissions on the queue:
* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
- source,json
-
{
"Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:123456789012:Logstash" ] } ]}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constant Summary collapse
- MAX_TIME_BEFORE_GIVING_UP =
60- MAX_MESSAGES_TO_FETCH =
Between 1-10 in the AWS-SDK doc
10- SENT_TIMESTAMP =
"SentTimestamp"- SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
- BACKOFF_SLEEP_TIME =
1- BACKOFF_FACTOR =
2- DEFAULT_POLLING_FREQUENCY =
20
Instance Attribute Summary collapse
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
Instance Method Summary collapse
- #add_sqs_data(event, message) ⇒ Object
- #handle_message(message, output_queue) ⇒ Object
- #polling_options ⇒ Object
- #queue_url(aws_sqs_client) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #setup_queue ⇒ Object
Instance Attribute Details
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
96 97 98 |
# File 'lib/logstash/inputs/sqs.rb', line 96 def poller @poller end |
Instance Method Details
#add_sqs_data(event, message) ⇒ Object
132 133 134 135 136 137 |
# File 'lib/logstash/inputs/sqs.rb', line 132 def add_sqs_data(event, ) event.set(@id_field, .) if @id_field event.set(@md5_field, .md5_of_body) if @md5_field event.set(, (.attributes[SENT_TIMESTAMP])) if event end |
#handle_message(message, output_queue) ⇒ Object
139 140 141 142 143 144 145 |
# File 'lib/logstash/inputs/sqs.rb', line 139 def (, output_queue) @codec.decode(.body) do |event| add_sqs_data(event, ) decorate(event) output_queue << event end end |
#polling_options ⇒ Object
124 125 126 127 128 129 130 |
# File 'lib/logstash/inputs/sqs.rb', line 124 def { :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, :wait_time_seconds => @polling_frequency } end |
#queue_url(aws_sqs_client) ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/logstash/inputs/sqs.rb', line 105 def queue_url(aws_sqs_client) if @queue_owner_aws_account_id return aws_sqs_client.get_queue_url({:queue_name => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id})[:queue_url] else return aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url] end end |
#register ⇒ Object
98 99 100 101 102 103 |
# File 'lib/logstash/inputs/sqs.rb', line 98 def register require "aws-sdk-sqs" @logger.info("Registering SQS input", :queue => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id) setup_queue end |
#run(output_queue) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/logstash/inputs/sqs.rb', line 147 def run(output_queue) @logger.debug("Polling SQS queue", :polling_options => ) run_with_backoff do poller.poll() do |, stats| break if stop? .each {|| (, output_queue) } @logger.debug("SQS Stats:", :request_count => stats.request_count, :received_message_count => stats., :last_message_received_at => stats.) if @logger.debug? end end end |
#setup_queue ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/logstash/inputs/sqs.rb', line 113 def setup_queue aws_sqs_client = Aws::SQS::Client.new( || {}) poller = Aws::SQS::QueuePoller.new(queue_url(aws_sqs_client), :client => aws_sqs_client) poller.before_request { |stats| throw :stop_polling if stop? } @poller = poller rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e @logger.error("Cannot establish connection to Amazon SQS", exception_details(e)) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end |