Class: LogStash::Inputs::SQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::SQS
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/sqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.
The “consumer” identity must have the following permissions on the queue:
* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
- source,json
-
{
"Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:123456789012:Logstash" ] } ]}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constant Summary collapse
- MAX_TIME_BEFORE_GIVING_UP =
60- MAX_MESSAGES_TO_FETCH =
Between 1-10 in the AWS-SDK doc
10- SENT_TIMESTAMP =
"SentTimestamp"- SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
- BACKOFF_SLEEP_TIME =
1- BACKOFF_FACTOR =
2- DEFAULT_POLLING_FREQUENCY =
20
Instance Attribute Summary collapse
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
Instance Method Summary collapse
- #add_sqs_data(event, message) ⇒ Object
- #decode_event(message) ⇒ Object
- #handle_message(message) ⇒ Object
- #polling_options ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #setup_queue ⇒ Object
Instance Attribute Details
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
98 99 100 |
# File 'lib/logstash/inputs/sqs.rb', line 98 def poller @poller end |
Instance Method Details
#add_sqs_data(event, message) ⇒ Object
130 131 132 133 134 135 136 |
# File 'lib/logstash/inputs/sqs.rb', line 130 def add_sqs_data(event, ) event.set(@id_field, .) if @id_field event.set(@md5_field, .md5_of_body) if @md5_field event.set(@sent_timestamp_field, (.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field return event end |
#decode_event(message) ⇒ Object
124 125 126 127 128 |
# File 'lib/logstash/inputs/sqs.rb', line 124 def decode_event() @codec.decode(.body) do |event| return event end end |
#handle_message(message) ⇒ Object
138 139 140 141 142 143 |
# File 'lib/logstash/inputs/sqs.rb', line 138 def () event = decode_event() add_sqs_data(event, ) decorate(event) return event end |
#polling_options ⇒ Object
116 117 118 119 120 121 122 |
# File 'lib/logstash/inputs/sqs.rb', line 116 def { :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, :wait_time_seconds => @polling_frequency } end |
#register ⇒ Object
100 101 102 103 104 105 |
# File 'lib/logstash/inputs/sqs.rb', line 100 def register require "aws-sdk" @logger.info("Registering SQS input", :queue => @queue) setup_queue end |
#run(output_queue) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/logstash/inputs/sqs.rb', line 145 def run(output_queue) @logger.debug("Polling SQS queue", :polling_options => ) run_with_backoff do poller.poll() do |, stats| break if stop? .each do || output_queue << () end @logger.debug("SQS Stats:", :request_count => stats.request_count, :received_message_count => stats., :last_message_received_at => stats.) if @logger.debug? end end end |
#setup_queue ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/logstash/inputs/sqs.rb', line 107 def setup_queue aws_sqs_client = Aws::SQS::Client.new() queue_url = aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url] @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client) rescue Aws::SQS::Errors::ServiceError => e @logger.error("Cannot establish connection to Amazon SQS", :error => e) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end |