Class: AWS::SQS::Queue

Inherits:
Object
  • Object
show all
Includes:
Core::Model
Defined in:
lib/aws/sqs/queue.rb

Overview

Represents an Amazon SQS Queue.

Examples:

Sending a message

msg = queue.send_message("HELLO")
puts "Sent message: #{msg.id}"

Polling for messages indefinitely

queue.poll do |msg|
  puts "Got message: #{msg.body}"
end

Defined Under Namespace

Modules: PolicyProxy Classes: SentMessage

Constant Summary collapse

DEFAULT_POLL_INTERVAL =

The default number of seconds to wait between polling requests for new messages.

1

Instance Attribute Summary collapse

Attributes included from Core::Model

#config

Instance Method Summary collapse

Methods included from Core::Model

#client, #config_prefix

Constructor Details

#initialize(url, opts = {}) ⇒ Queue

Returns a new instance of Queue.



40
41
42
43
# File 'lib/aws/sqs/queue.rb', line 40

def initialize(url, opts = {})
  @url = url
  super
end

Instance Attribute Details

#urlString (readonly)

Returns The queue URL.

Returns:

  • (String)

    The queue URL.



37
38
39
# File 'lib/aws/sqs/queue.rb', line 37

def url
  @url
end

Instance Method Details

#==(other) ⇒ Boolean Also known as: eql?

Returns true if the other queue has the same url.

Returns:

  • (Boolean)

    Returns true if the other queue has the same url.



620
621
622
# File 'lib/aws/sqs/queue.rb', line 620

def ==(other)
  other.kind_of?(Queue) and other.url == url
end

#approximate_number_of_messagesInteger Also known as: visible_messages

Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:



278
279
280
# File 'lib/aws/sqs/queue.rb', line 278

def approximate_number_of_messages
  get_attribute("ApproximateNumberOfMessages").to_i
end

#approximate_number_of_messages_delayedInteger

Returns an approximate count of messages delayed.

Returns:

  • (Integer)

    Returns an approximate count of messages delayed.



374
375
376
# File 'lib/aws/sqs/queue.rb', line 374

def approximate_number_of_messages_delayed
  get_attribute("ApproximateNumberOfMessagesDelayed").to_i
end

#approximate_number_of_messages_not_visibleInteger Also known as: invisible_messages

Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.



288
289
290
# File 'lib/aws/sqs/queue.rb', line 288

def approximate_number_of_messages_not_visible
  get_attribute("ApproximateNumberOfMessagesNotVisible").to_i
end

#arnString

Returns The queue’s Amazon resource name (ARN).

Returns:

  • (String)

    The queue’s Amazon resource name (ARN).



379
380
381
# File 'lib/aws/sqs/queue.rb', line 379

def arn
  @arn ||= get_attribute("QueueArn")
end

#batch_change_message_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_message_visibility(*messages_with_timeouts) ⇒ nil

Overloads:

  • #batch_change_message_visibility(visibility_timeout, *messages) ⇒ nil

    Accepts a single :visibility_timeout value and a list of messages (ReceivedMessage objects or receipt handle strings).

    This form of the method is useful when you want to set the same timeout value for each message.

    queue.bacth_change_message_visibility(10, messages)
    

    Parameters:

    • visibility_timeout (Integer)

      The new value for the message’s visibility timeout (in seconds).

    • message (ReceivedMessage, String)

      A list of up to 10 messages to change the visibility timeout for.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

  • #batch_change_message_visibility(*messages_with_timeouts) ⇒ nil

    Accepts a list of hashes. Each hash should provide the visibility timeout and message (a ReceivedMessage object or the recipt handle string).

    Use this form when each message needs a different visiblity timeout.

    messages = []
    messages << { :message => 'handle1', :visibility_timeout => 5 }
    messages << { :message => 'handle2', :visibility_timeout => 10 }
    
    queue.bacth_change_message_visibility(*messages)
    

    Parameters:

    • message (Hash)

      A list hashes, each with a :visibility_timeout and a :message.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

Raises:



585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'lib/aws/sqs/queue.rb', line 585

def batch_change_visibility *args
  
  args = args.flatten
  
  if args.first.is_a?(Integer)
    timeout = args.shift
    messages = args.collect{|m| [m, timeout] }
  else
    messages = args.collect{|m| [m[:message], m[:visibility_timeout]] }
  end

  entries = []
  messages.each do |msg,timeout|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << {
      :id => entries.size.to_s, 
      :receipt_handle => handle,
      :visibility_timeout => timeout,
    }
  end

  response = client.change_message_visibility_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchChangeVisibilityError.new(failures) unless 
    failures.empty?

  nil

end

#batch_delete(*messages) ⇒ nil

Parameters:

  • messages (ReceivedMessage, String)

    A list of up to 10 messages to delete. Each message should be a ReceivedMessage object or a received message handle (string).

Returns:

  • (nil)

Raises:

  • (Errors::BatchDeleteSend)

    Raised when one or more of the messages failed to delete. The raised error has a list of the failures.



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/aws/sqs/queue.rb', line 524

def batch_delete *messages

  entries = []
  messages.flatten.each_with_index do |msg,n|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << { :id => n.to_s, :receipt_handle => handle }
  end

  response = client.delete_message_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchDeleteError.new(failures) unless failures.empty?

  nil

end

#batch_send(*messages) ⇒ Array<SentMessage>

Sends a batch of up to 10 messages in a single request.

queue.send_messages('message-1', 'message-2')

You can also set an optional delay for all of the messages:

# delay all messages 1 hour
queue.batch_send(msg1, msg2, :delay_seconds => 3600)

If you need to set a custom delay for each message you can pass hashes:

messages = []
messages << { :message_body => 'msg1', :delay_seconds => 60 }
messages << { :message_body => 'msg2', :delay_seconds => 30 }

queue.batch_send(messages)

Parameters:

  • messages (String, Hash)

    A list of messages. Each message should be a string, or a hash with a :message_body, and optionally :delay_seconds.

Returns:

  • (Array<SentMessage>)

    Returns an array of sent message objects. Each object responds to #message_id and #md5_of_message_body. The message id is generated by Amazon SQS.

Raises:

  • (Errors::BatchSendError)

    Raises this error when one or more of the messages failed to send, but others did. On the raised object you can access a list of the messages that failed, and a list of messages that succeeded.



479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# File 'lib/aws/sqs/queue.rb', line 479

def batch_send *messages

  entries = messages.flatten

  unless entries.first.is_a?(Hash)
    options = entries.last.is_a?(Hash) ? entries.pop : {}
    entries = entries.collect{|msg| { :message_body => msg } }
    if delay = options[:delay_seconds]
      entries.each {|entry| entry[:delay_seconds] = delay }
    end
  end

  entries.each_with_index {|entry,n| entry[:id] = n.to_s }

  client_opts = {}
  client_opts[:queue_url] = url
  client_opts[:entries] = entries

  response = client.send_message_batch(client_opts)

  failed = batch_failures(entries, response)

  sent = response[:successful].collect do |sent|
    msg = SentMessage.new
    msg.message_id = sent[:message_id]
    msg.md5 = sent[:md5_of_message_body]
    msg
  end

  raise Errors::BatchSendError.new(sent, failed) unless failed.empty?

  sent
  
end

#created_timestampTime

Returns The time when the queue was created.

Returns:

  • (Time)

    The time when the queue was created.



317
318
319
# File 'lib/aws/sqs/queue.rb', line 317

def created_timestamp
  Time.at(get_attribute("CreatedTimestamp").to_i)
end

#delay_secondsInteger

Returns Gets the current default delay for messages sent to the queue.

Returns:

  • (Integer)

    Gets the current default delay for messages sent to the queue.



363
364
365
# File 'lib/aws/sqs/queue.rb', line 363

def delay_seconds
  get_attribute("DelaySeconds").to_i
end

#delay_seconds=(seconds) ⇒ Object

Sets the default delay for messages sent to the queue.

Parameters:

  • seconds (Integer)

    How many seconds a message will be delayed.



369
370
371
# File 'lib/aws/sqs/queue.rb', line 369

def delay_seconds= seconds
  set_attribute("DelaySeconds", seconds.to_s)
end

#deletenil

Deletes the queue, regardless of whether it is empty.

When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.

Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.

Returns:

  • (nil)


56
57
58
59
# File 'lib/aws/sqs/queue.rb', line 56

def delete
  client.delete_queue(:queue_url => url)
  nil
end

#exists?Boolean

Note:

This may raise an exception if you don’t have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.

Returns True if the queue exists.

Returns:

  • (Boolean)

    True if the queue exists.



389
390
391
392
393
394
395
396
# File 'lib/aws/sqs/queue.rb', line 389

def exists?
  client.get_queue_attributes(:queue_url => url,
                              :attribute_names => ["QueueArn"])
rescue Errors::NonExistentQueue, Errors::InvalidAddress
  false
else
  true
end

#inspectObject



626
627
628
# File 'lib/aws/sqs/queue.rb', line 626

def inspect
  "<#{self.class}:#{url}>"
end

#last_modified_timestampTime

Returns The time when the queue was last changed.

Returns:

  • (Time)

    The time when the queue was last changed.



322
323
324
# File 'lib/aws/sqs/queue.rb', line 322

def last_modified_timestamp
  Time.at(get_attribute("LastModifiedTimestamp").to_i)
end

#maximum_message_sizeInteger

Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.

Returns:

  • (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it.



328
329
330
# File 'lib/aws/sqs/queue.rb', line 328

def maximum_message_size
  get_attribute("MaximumMessageSize").to_i
end

#maximum_message_size=(size) ⇒ Object

Sets the maximum message size for the queue.

Parameters:

  • size (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it. This must be an integer from 1024 bytes (1KB) up to 65536 bytes (64KB). The default for this attribute is 8192 (8KB).

Returns:

  • Retuns the passed size argument.



339
340
341
# File 'lib/aws/sqs/queue.rb', line 339

def maximum_message_size=(size)
  set_attribute("MaximumMessageSize", size.to_s)
end

#message_retention_periodInteger

Returns The number of seconds Amazon SQS retains a message.

Returns:

  • (Integer)

    The number of seconds Amazon SQS retains a message.



345
346
347
# File 'lib/aws/sqs/queue.rb', line 345

def message_retention_period
  get_attribute("MessageRetentionPeriod").to_i
end

#message_retention_period=(period) ⇒ Object

Sets the message retention period for the queue

Parameters:

  • period (Integer)

    The number of seconds Amazon SQS retains a message. Must be an integer from 3600 (1 hour) to 1209600 (14 days). The default for this attribute is 345600 (4 days).

Returns:

  • Returns the passed period argument.



356
357
358
359
# File 'lib/aws/sqs/queue.rb', line 356

def message_retention_period=(period)
  set_attribute("MessageRetentionPeriod", period.to_s)
  period
end

#policyPolicy

Returns the current queue policy if there is one. Returns nil otherwise.

Returns:

  • (Policy)

    Returns the current queue policy if there is one. Returns nil otherwise.



416
417
418
419
420
421
422
423
424
425
# File 'lib/aws/sqs/queue.rb', line 416

def policy
  if policy_json = get_attribute('Policy')
    policy = SQS::Policy.from_json(policy_json)
    policy.extend(PolicyProxy)
    policy.queue = self
    policy
  else
    nil
  end
end

#policy=(policy) ⇒ nil

Set the policy on this queue.

If you pass nil or an empty string then it will have the same effect as deleting the policy.

Parameters:

  • policy

    The policy to set. This policy can be a Policy object, a json policy string, or any other object that responds with a policy string when it received #to_json.

Returns:

  • (nil)


438
439
440
441
442
443
444
445
446
# File 'lib/aws/sqs/queue.rb', line 438

def policy= policy
  policy_string = case policy
  when nil, '' then ''
  when String  then policy
  else policy.to_json
  end
  set_attribute('Policy', policy_string)
  nil
end

#poll(opts = {}) {|message| ... } ⇒ nil

Polls continually for messages. For example, you can use this to poll indefinitely:

queue.poll { |msg| puts msg.body }

Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:

queue.poll(:initial_timeout => false,
           :idle_timeout => 10) { |msg| puts msg.body }

As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for polling.

Options Hash (opts):

  • :poll_interval (Float, Integer)

    The number of seconds to wait before retrying when no messages are received. The default is 1 second.

  • :idle_timeout (Integer)

    The maximum number of seconds to spend polling while no messages are being returned. By default this method polls indefinitely whether messages are received or not.

  • :initial_timeout (Integer)

    The maximum number of seconds to spend polling before the first message is received. This option defaults to the value of :idle_timeout. You can specify false to poll indefinitely for the first message when :idle_timeout is set.

  • :batch_size (Integer)

    The maximum number of messages to retrieve in a single request. By default messages are received one at a time. Valid values: integers from 1 to 10.

  • :visibilitiy_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)

    • :sender_id

    • :sent_at

    • :receive_count

    • :first_received_at

    See ReceivedMessage for documentation on each attribute’s meaning.

Yield Parameters:

Returns:

  • (nil)


251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/aws/sqs/queue.rb', line 251

def poll(opts = {}, &block)
  poll_interval = opts[:poll_interval] || DEFAULT_POLL_INTERVAL
  opts[:limit] = opts.delete(:batch_size) if
    opts.key?(:batch_size)

  last_message_at = Time.now
  got_first = false
  loop do
    got_msg = false
    receive_messages(opts) do |message|
      got_msg = got_first = true
      last_message_at = Time.now
      yield(message)
    end
    unless got_msg
      Kernel.sleep(poll_interval) unless poll_interval == 0
      return if hit_timeout?(got_first, last_message_at, opts)
    end
  end
  nil
end

#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages

Note:

Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.

Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for receiving messages.

Options Hash (opts):

  • :limit (Integer)

    The maximum number of messages to receive. By default this is 1, and the return value is a single message object. If this options is specified and is not 1, the return value is an array of message objects; however, the array may contain fewer objects than you requested. Valid values: integers from 1 to 10.

    Not necessarily all the messages in the queue are returned (for more information, see the preceding note about machine sampling).

  • :visibilitiy_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)

    • :sender_id

    • :sent_at

    • :receive_count

    • :first_received_at

    See ReceivedMessage for documentation on each attribute’s meaning.

Yield Parameters:

Returns:

  • (ReceivedMessage)

    Returns the received message (or messages) only if a block is not given to this method.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/aws/sqs/queue.rb', line 172

def receive_message(opts = {}, &block)
  resp = client.receive_message(receive_opts(opts))

  messages = resp[:messages].map do |m|
    ReceivedMessage.new(self, m[:message_id], m[:receipt_handle],
      :body => m[:body],
      :md5 => m[:md5_of_body],
      :attributes => m[:attributes])
  end

  if block
    call_message_block(messages, block)
  elsif opts[:limit] && opts[:limit] != 1
    messages
  else
    messages.first
  end
end

#send_message(body, options = {}) ⇒ SentMessage

Delivers a message to this queue.

Parameters:

  • body (String)

    The message to send. The maximum allowed message size is 64 KB. The message may only contain Unicode characters from the following list, according to the W3C XML specification (for more information, go to www.w3.org/TR/REC-xml/#charsets). If you send any characters not included in the list, your request will be rejected.

    • #x9

    • #xA

    • #xD

    • #x20 to #xD7FF

    • #xE000 to #xFFFD

    • #x10000 to #x10FFFF

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :delay_seconds (Integer)

    The number of seconds to delay the message. The message will become available for processing after the delay time has passed. If you don’t specify a value, the default value for the queue applies. Should be from 0 to 900 (15 mins).

Returns:

  • (SentMessage)

    An object containing information about the message that was sent.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/aws/sqs/queue.rb', line 105

def send_message body, options = {}

  client_opts = options.dup
  client_opts[:queue_url] = url
  client_opts[:message_body] = body

  response = client.send_message(client_opts)

  msg = SentMessage.new
  msg.message_id = response[:message_id]
  msg.md5 = response[:md5_of_message_body]
  msg

end

#visibility_timeoutInteger

Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.



298
299
300
# File 'lib/aws/sqs/queue.rb', line 298

def visibility_timeout
  get_attribute("VisibilityTimeout").to_i
end

#visibility_timeout=(timeout) ⇒ Object

Sets the visibility timeout for the queue.

Parameters:

  • timeout (Integer)

    The length of time (in seconds) that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Valid values: integers from 0 to 43200 (12 hours).

Returns:

  • Returns the value passed as a timeout.



311
312
313
314
# File 'lib/aws/sqs/queue.rb', line 311

def visibility_timeout=(timeout)
  set_attribute("VisibilityTimeout", timeout.to_s)
  timeout
end