Class: LogStash::Inputs::SQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::SQS
- Includes:
- PluginMixins::AwsConfig
- Defined in:
- lib/logstash/inputs/sqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume from the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:DeleteMessage
* sqs:DeleteMessageBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:ReceiveMessage
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{
"Statement": [
{
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:SendMessage",
"sqs:SendMessageBatch"
],
"Effect": "Allow",
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:Logstash"
]
}
]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Instance Method Summary collapse
- #aws_service_endpoint(region) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#teardown ⇒ Object
def run.
Instance Method Details
#aws_service_endpoint(region) ⇒ Object
81 82 83 84 85 |
# File 'lib/logstash/inputs/sqs.rb', line 81 def aws_service_endpoint(region) return { :sqs_endpoint => "sqs.#{region}.amazonaws.com" } end |
#register ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/logstash/inputs/sqs.rb', line 88 def register @logger.info("Registering SQS input", :queue => @queue) require "aws-sdk" @sqs = AWS::SQS.new() begin @logger.debug("Connecting to AWS SQS queue", :queue => @queue) @sqs_queue = @sqs.queues.named(@queue) @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue) rescue Exception => e @logger.error("Unable to access SQS queue.", :error => e.to_s, :queue => @queue) throw e end # begin/rescue end |
#run(output_queue) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/logstash/inputs/sqs.rb', line 105 def run(output_queue) @logger.debug("Polling SQS queue", :queue => @queue) receive_opts = { :limit => 10, :visibility_timeout => 30, :attributes => [:sent_at] } continue_polling = true while running? && continue_polling continue_polling = run_with_backoff(60, 1) do @sqs_queue.(receive_opts) do || if @codec.decode(.body) do |event| decorate(event) if @id_field event[@id_field] = .id end if @md5_field event[@md5_field] = .md5 end if event[] = LogStash::Timestamp.new(.).utc end @logger.debug? && @logger.debug("Processed SQS message", :message_id => .id, :message_md5 => .md5, :sent_timestamp => ., :queue => @queue) output_queue << event .delete end # codec.decode end # valid SQS message end # receive_message end # run_with_backoff end # polling loop end |
#teardown ⇒ Object
def run
140 141 142 143 |
# File 'lib/logstash/inputs/sqs.rb', line 140 def teardown @sqs_queue = nil finished end |