Class: LogStash::Inputs::SQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::SQS
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/sqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.
The “consumer” identity must have the following permissions on the queue:
* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
- source,json
-
{
"Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:123456789012:Logstash" ] } ]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constant Summary collapse
- MAX_TIME_BEFORE_GIVING_UP =
60
- MAX_MESSAGES_TO_FETCH =
Between 1-10 in the AWS-SDK doc
10
- SENT_TIMESTAMP =
"SentTimestamp"
- SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
- BACKOFF_SLEEP_TIME =
1
- BACKOFF_FACTOR =
2
- DEFAULT_POLLING_FREQUENCY =
20
Instance Attribute Summary collapse
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
Instance Method Summary collapse
- #add_sqs_data(event, message) ⇒ Object
- #handle_message(message, output_queue) ⇒ Object
- #polling_options ⇒ Object
- #queue_url(aws_sqs_client) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #setup_queue ⇒ Object
Instance Attribute Details
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
103 104 105 |
# File 'lib/logstash/inputs/sqs.rb', line 103 def poller @poller end |
Instance Method Details
#add_sqs_data(event, message) ⇒ Object
139 140 141 142 143 144 |
# File 'lib/logstash/inputs/sqs.rb', line 139 def add_sqs_data(event, ) event.set(@id_field, .) if @id_field event.set(@md5_field, .md5_of_body) if @md5_field event.set(@sent_timestamp_field, (.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field event end |
#handle_message(message, output_queue) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/logstash/inputs/sqs.rb', line 146 def (, output_queue) @codec.decode(.body) do |event| add_sqs_data(event, ) decorate(event) output_queue << event end end |
#polling_options ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/logstash/inputs/sqs.rb', line 131 def { :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, :wait_time_seconds => @polling_frequency } end |
#queue_url(aws_sqs_client) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/logstash/inputs/sqs.rb', line 112 def queue_url(aws_sqs_client) if @queue_owner_aws_account_id return aws_sqs_client.get_queue_url({:queue_name => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id})[:queue_url] else return aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url] end end |
#register ⇒ Object
105 106 107 108 109 110 |
# File 'lib/logstash/inputs/sqs.rb', line 105 def register require "aws-sdk" @logger.info("Registering SQS input", :queue => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id) setup_queue end |
#run(output_queue) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/logstash/inputs/sqs.rb', line 154 def run(output_queue) @logger.debug("Polling SQS queue", :polling_options => ) run_with_backoff do poller.poll() do |, stats| break if stop? .each {|| (, output_queue) } @logger.debug("SQS Stats:", :request_count => stats.request_count, :received_message_count => stats., :last_message_received_at => stats.) if @logger.debug? end end end |
#setup_queue ⇒ Object
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/logstash/inputs/sqs.rb', line 120 def setup_queue aws_sqs_client = Aws::SQS::Client.new( || {}) poller = Aws::SQS::QueuePoller.new(queue_url(aws_sqs_client), :client => aws_sqs_client) poller.before_request { |stats| throw :stop_polling if stop? } @poller = poller rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e @logger.error("Cannot establish connection to Amazon SQS", exception_details(e)) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end |