Class: LogStash::Inputs::SQS

Inherits:
Threadable
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/sqs.rb

Overview

Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.

The “consumer” identity must have the following permissions on the queue:

* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

source,json

{

"Statement": [
  {
    "Action": [
      "sqs:ChangeMessageVisibility",
      "sqs:ChangeMessageVisibilityBatch",
      "sqs:GetQueueAttributes",
      "sqs:GetQueueUrl",
      "sqs:ListQueues",
      "sqs:SendMessage",
      "sqs:SendMessageBatch"
    ],
    "Effect": "Allow",
    "Resource": [
      "arn:aws:sqs:us-east-1:123456789012:Logstash"
    ]
  }
]

}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Constant Summary collapse

MAX_TIME_BEFORE_GIVING_UP =
60
MAX_MESSAGES_TO_FETCH =

Between 1-10 in the AWS-SDK doc

10
SENT_TIMESTAMP =
"SentTimestamp"
SQS_ATTRIBUTES =
[SENT_TIMESTAMP]
BACKOFF_SLEEP_TIME =
1
BACKOFF_FACTOR =
2
DEFAULT_POLLING_FREQUENCY =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pollerObject (readonly)

Returns the value of attribute poller.



103
104
105
# File 'lib/logstash/inputs/sqs.rb', line 103

def poller
  @poller
end

Instance Method Details

#add_sqs_data(event, message) ⇒ Object



139
140
141
142
143
144
# File 'lib/logstash/inputs/sqs.rb', line 139

def add_sqs_data(event, message)
  event.set(@id_field, message.message_id) if @id_field
  event.set(@md5_field, message.md5_of_body) if @md5_field
  event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field
  event
end

#handle_message(message, output_queue) ⇒ Object



146
147
148
149
150
151
152
# File 'lib/logstash/inputs/sqs.rb', line 146

def handle_message(message, output_queue)
  @codec.decode(message.body) do |event|
    add_sqs_data(event, message)
    decorate(event)
    output_queue << event
  end
end

#polling_optionsObject



131
132
133
134
135
136
137
# File 'lib/logstash/inputs/sqs.rb', line 131

def polling_options
  { 
    :max_number_of_messages => MAX_MESSAGES_TO_FETCH,
    :attribute_names => SQS_ATTRIBUTES,
    :wait_time_seconds => @polling_frequency
  }
end

#queue_url(aws_sqs_client) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/logstash/inputs/sqs.rb', line 112

def queue_url(aws_sqs_client)
  if @queue_owner_aws_account_id
    return aws_sqs_client.get_queue_url({:queue_name => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id})[:queue_url]
  else
    return aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url]
  end
end

#registerObject



105
106
107
108
109
110
# File 'lib/logstash/inputs/sqs.rb', line 105

def register
  require "aws-sdk"
  @logger.info("Registering SQS input", :queue => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id)

  setup_queue
end

#run(output_queue) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/logstash/inputs/sqs.rb', line 154

def run(output_queue)
  @logger.debug("Polling SQS queue", :polling_options => polling_options)

  run_with_backoff do
    poller.poll(polling_options) do |messages, stats|
      break if stop?
      messages.each {|message| handle_message(message, output_queue) }
      @logger.debug("SQS Stats:", :request_count => stats.request_count,
                    :received_message_count => stats.received_message_count,
                    :last_message_received_at => stats.last_message_received_at) if @logger.debug?
    end
  end
end

#setup_queueObject



120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/inputs/sqs.rb', line 120

def setup_queue
  aws_sqs_client = Aws::SQS::Client.new(aws_options_hash || {})
  poller = Aws::SQS::QueuePoller.new(queue_url(aws_sqs_client), :client => aws_sqs_client)
  poller.before_request { |stats| throw :stop_polling if stop? }

  @poller = poller
rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
  @logger.error("Cannot establish connection to Amazon SQS", exception_details(e))
  raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
end