Class: Aws::SQS::QueuePoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws-sdk-sqs/queue_poller.rb

Overview

A utility class for long polling messages in a loop. **Messages are automatically deleted from the queue at the end of the given block.**

poller = Aws::SQS::QueuePoller.new(queue_url)

poller.poll do |msg|
  puts msg.body
end

## Long Polling

By default, messages are received using long polling. This method will force a default `:wait_time_seconds` of 20 seconds. If you prefer to use the queue default wait time, then pass a `nil` value for `:wait_time_seconds`.

# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds
poller.poll(wait_time_seconds:nil) do |msg|
  # ...
end

When disabling `:wait_time_seconds` by passing `nil`, you must ensure the queue `ReceiveMessageWaitTimeSeconds` attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.

## Batch Receiving Messages

You can specify a maximum number of messages to receive with each polling attempt via `:max_number_of_messages`. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.

# receives and yields 1 message at a time
poller.poll do |msg|
  # ...
end

# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

The maximum value for `:max_number_of_messages` is enforced by Amazon SQS.

## Visibility Timeouts

When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue's `VisibilityTimeout` attribute is used. You can provide an alternative visibility timeout when polling.

# queue default VisibilityTimeout
poller.poll do |msg|
end

# custom visibility timeout
poller.poll(visibility_timeout:10) do |msg|
end

You can reset the visibility timeout of a single message by calling #change_message_visibility_timeout. This is useful when you need more time to finish processing the message.

poller.poll do |msg|

  # do work ...

  # need more time for processing
  poller.change_message_visibility_timeout(msg, 60)

  # finish work ...

end

If you change the visibility timeout of a message to zero, it will return to the queue immediately.

## Deleting Messages

Messages are deleted from the queue when the block returns normally.

poller.poll do |msg|
  # do work
end # messages deleted here

You can skip message deletion by passing `skip_delete: true`. This allows you to manually delete the messages using #delete_message, or #delete_messages.

# single message
poller.poll(skip_delete: true) do |msg|
  poller.delete_message(msg) # if successful
end

# batch delete messages
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
  poller.delete_messages(messages)
end

Another way to manage message deletion is to throw `:skip_delete` from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.

poller.poll do |msg|
  begin
    # do work
  rescue
    # unexpected error occurred while processing messages,
    # log it, and skip delete so it can be re-processed later
    throw :skip_delete
  end
end

## Terminating the Polling Loop

By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing `:stop_polling` from the #before_request callback.

### `:idle_timeout` Option

This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.

# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
  # ...
end

### Throw `:stop_polling`

If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing `:stop_polling` from this callback will cause the poller to exit normally without making the next request.

# stop after processing 100 messages
poller.before_request do |stats|
  throw :stop_polling if stats.received_message_count >= 100
end

poller.poll do |msg|
  # do work ...
end

## Tracking Progress

The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:

Here are examples of accessing the statistics.

  • Configure a #before_request callback.

    “` poller.before_request do |stats|

    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")
    

    end “`

  • Accept a 2nd argument in the poll block, for example:

    “` poller.poll do |msg, stats|

    logger.info("requests: #{stats.request_count}")
    logger.info("messages: #{stats.received_message_count}")
    logger.info("last-timestamp: #{stats.last_message_received_at}")
    

    end “`

  • Return value:

    “` stats = poller.poll(idle_timeout:10) do |msg|

    # do work ...
    

    end logger.info(“requests: #statsstats.request_count”) logger.info(“messages: #statsstats.received_message_count”) logger.info(“last-timestamp: #statsstats.last_message_received_at”) “`

Defined Under Namespace

Classes: PollerConfig, PollerStats

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url, options = {}) ⇒ QueuePoller

Returns a new instance of QueuePoller.

Parameters:

  • queue_url (String)
  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :client (Client)
  • :wait_time_seconds (Integer) — default: 20

    The long polling interval. Messages are yielded as soon as they are received. The `:wait_time_seconds` option specifies the max duration for each polling attempt before a new request is sent to receive messages.

  • :max_number_of_messages (Integer) — default: 1

    The maximum number of messages to yield from each polling attempt. Values can be from 1 to 10.

  • :visibility_timeout (Integer) — default: nil

    The number of seconds you have to process a message before it is put back into the queue and can be received again. By default, the queue's visibility timeout is not set.

  • :attribute_names (Array<String>) — default: []

    The list of attributes that need to be returned along with each message. Valid attribute names include:

    • `All` - All attributes.

    • `ApproximateFirstReceiveTimestamp` - The time when the message

      was first received from the queue (epoch time in milliseconds).
      
    • `ApproximateReceiveCount` - The number of times a message has

      been received from the queue but not deleted.
      
    • `SenderId` - The AWS account number (or the IP address, if

      anonymous access is allowed) of the sender.
      
    • `SentTimestamp` - The time when the message was sent to the

      queue (epoch time in milliseconds).
      
  • :message_attribute_names (Array<String>) — default: []

    A list of message attributes to receive. You can receive all messages by using `All` or `.*`. You can also use `foo.*` to return all message attributes starting with the `foo` prefix.

  • :idle_timeout (Integer) — default: nil

    Polling terminates gracefully when `:idle_timeout` seconds have passed without receiving any messages.

  • :skip_delete (Boolean) — default: false

    When `true`, messages are not deleted after polling block. If you wish to delete received messages, you will need to call `#delete_message` or `#delete_messages` manually.

  • :before_request (Proc) — default: nil

    Called before each polling attempt. This proc receives a single argument, an instance of PollerStats.


210
211
212
213
214
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 210

def initialize(queue_url, options = {})
  @queue_url = queue_url
  @client = options.delete(:client) || Client.new
  @default_config = PollerConfig.new(options)
end

Instance Attribute Details

#clientClient (readonly)

Returns:


220
221
222
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 220

def client
  @client
end

#default_configPollerConfig (readonly)

Returns:


223
224
225
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 223

def default_config
  @default_config
end

#queue_urlString (readonly)

Returns:

  • (String)

217
218
219
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 217

def queue_url
  @queue_url
end

Instance Method Details

#before_request {|stats| ... } ⇒ void

This method returns an undefined value.

Registers a callback that is invoked once before every polling attempt.

poller.before_request do |stats|
  logger.info("requests: #{stats.request_count}")
  logger.info("messages: #{stats.received_message_count}")
  logger.info("last-timestamp: #{stats.last_message_received_at}")
end

poller.poll do |msg|
  # do work ...
end

## `:stop_polling`

If you throw `:stop_polling` from the #before_request callback, then the poller will exit normally before making the next long poll request.

poller.before_request do |stats|
  throw :stop_polling if stats.received_messages >= 100
end

# at most 100 messages will be yielded
poller.poll do |msg|
  # do work ...
end

Yield Parameters:

  • stats (PollerStats)

    An object that tracks a few client-side statistics about the queue polling.


257
258
259
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 257

def before_request(&block)
  @default_config = @default_config.with(before_request: block) if block_given?
end

#change_message_visibility_timeout(message, seconds) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • message (#receipt_handle)

    An object that responds to `#receipt_handle`.

  • seconds (Integer)

350
351
352
353
354
355
356
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 350

def change_message_visibility_timeout(message, seconds)
  @client.change_message_visibility({
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle,
    visibility_timeout: seconds,
  })
end

#delete_message(message) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • message (#receipt_handle)

    An object that responds to `#receipt_handle`.


361
362
363
364
365
366
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 361

def delete_message(message)
  @client.delete_message({
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle,
  })
end

#delete_messages(messages) ⇒ Object

Note:

This method should be called from inside a #poll block.

Parameters:

  • messages (Array<#message_id, #receipt_handle>)

    An array of received messages. Each object must respond to `#message_id` and `#receipt_handle`.


372
373
374
375
376
377
378
379
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 372

def delete_messages(messages)
  @client.delete_message_batch(
    queue_url: @queue_url,
    entries: messages.map { |msg|
      { id: msg.message_id, receipt_handle: msg.receipt_handle }
    }
  )
end

#poll(options = {}, &block) ⇒ PollerStats

Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on Aws::SQS::QueuePoller for more examples.

Examples:

Basic example, loops indefinitely


poller.poll do |msg|
  # ...
end

Receives and deletes messages as a batch


poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :wait_time_seconds (Integer) — default: 20

    The long polling interval. Messages are yielded as soon as they are received. The `:wait_time_seconds` option specifies the max duration for each polling attempt before a new request is sent to receive messages.

  • :max_number_of_messages (Integer) — default: 1

    The maximum number of messages to yield from each polling attempt. Values can be from 1 to 10.

  • :visibility_timeout (Integer) — default: nil

    The number of seconds you have to process a message before it is put back into the queue and can be received again. By default, the queue's visibility timeout is not set.

  • :attribute_names (Array<String>) — default: []

    The list of attributes that need to be returned along with each message. Valid attribute names include:

    • `All` - All attributes.

    • `ApproximateFirstReceiveTimestamp` - The time when the message

      was first received from the queue (epoch time in milliseconds).
      
    • `ApproximateReceiveCount` - The number of times a message has

      been received from the queue but not deleted.
      
    • `SenderId` - The AWS account number (or the IP address, if

      anonymous access is allowed) of the sender.
      
    • `SentTimestamp` - The time when the message was sent to the

      queue (epoch time in milliseconds).
      
  • :message_attribute_names (Array<String>) — default: []

    A list of message attributes to receive. You can receive all messages by using `All` or `.*`. You can also use `foo.*` to return all message attributes starting with the `foo` prefix.

  • :idle_timeout (Integer) — default: nil

    Polling terminates gracefully when `:idle_timeout` seconds have passed without receiving any messages.

  • :skip_delete (Boolean) — default: false

    When `true`, messages are not deleted after polling block. If you wish to delete received messages, you will need to call `#delete_message` or `#delete_messages` manually.

  • :before_request (Proc) — default: nil

    Called before each polling attempt. This proc receives a single argument, an instance of PollerStats.

Returns:


329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 329

def poll(options = {}, &block)
  config = @default_config.with(options)
  stats = PollerStats.new
  catch(:stop_polling) do
    loop do
      messages = get_messages(config, stats)
      if messages.empty?
        check_idle_timeout(config, stats, messages)
      else
        process_messages(config, stats, messages, &block)
      end
    end
  end
  stats.polling_stopped_at = Time.now
  stats
end