Class: LogStash::Inputs::SQS

Inherits:
Threadable
  • Object
show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/inputs/sqs.rb

Overview

Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS 
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.

The “consumer” identity must have the following permissions on the queue:

* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:DeleteMessage
* sqs:DeleteMessageBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:ReceiveMessage

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

{
  "Statement": [
    {
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueues",
        "sqs:SendMessage",
        "sqs:SendMessageBatch"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:sqs:us-east-1:123456789012:Logstash"
      ]
    }
  ]
}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Instance Method Summary collapse

Instance Method Details

#aws_service_endpoint(region) ⇒ Object



81
82
83
84
85
# File 'lib/logstash/inputs/sqs.rb', line 81

def aws_service_endpoint(region)
  return {
      :sqs_endpoint => "sqs.#{region}.amazonaws.com"
  }
end

#registerObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/logstash/inputs/sqs.rb', line 88

def register
  @logger.info("Registering SQS input", :queue => @queue)
  require "aws-sdk"

  @sqs = AWS::SQS.new(aws_options_hash)

  begin
    @logger.debug("Connecting to AWS SQS queue", :queue => @queue)
    @sqs_queue = @sqs.queues.named(@queue)
    @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue)
  rescue Exception => e
    @logger.error("Unable to access SQS queue.", :error => e.to_s, :queue => @queue)
    throw e
  end # begin/rescue
end

#run(output_queue) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/logstash/inputs/sqs.rb', line 105

def run(output_queue)
  @logger.debug("Polling SQS queue", :queue => @queue)

  receive_opts = {
      :limit => 10,
      :visibility_timeout => 30,
      :attributes => [:sent_at]
  }

  continue_polling = true
  while running? && continue_polling
    continue_polling = run_with_backoff(60, 1) do
      @sqs_queue.receive_message(receive_opts) do |message|
        if message
          @codec.decode(message.body) do |event|
            decorate(event)
            if @id_field
              event[@id_field] = message.id
            end
            if @md5_field
              event[@md5_field] = message.md5
            end
            if @sent_timestamp_field
              event[@sent_timestamp_field] = LogStash::Timestamp.new(message.sent_timestamp).utc
            end
            @logger.debug? && @logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :sent_timestamp => message.sent_timestamp, :queue => @queue)
            output_queue << event
            message.delete
          end # codec.decode
        end # valid SQS message
      end # receive_message
    end # run_with_backoff
  end # polling loop
end

#teardownObject

def run



140
141
142
143
# File 'lib/logstash/inputs/sqs.rb', line 140

def teardown
  @sqs_queue = nil
  finished
end