Class: SQS::Message
- Inherits:
-
Object
- Object
- SQS::Message
- Defined in:
- lib/sqs/message.rb
Overview
SQS::Message provides a light wrapper over the RightAWS SQS API. It embeds the message payload in a JSON container to allow enqueue time and time_in_queue calculations.
Note that by default, JSON serialization is used to encode contents. This means that contents must be JSON-serializable (i.e. no cyclical objects, singleton strings, etc.) If this is problematic, the serialization mechanism can be customized by overriding the serialize and self.deserialize methods.
To use, simply instantiate, set the contents and call queue! For example:
m = SQS::Message.new 'my_test_queue'
m.contents = {:var1 => 1, :var2 => 2}
m.queue!
pushes a message containing => 1, :var2 => 2 to the ‘my_test_queue’ queue on SQS
To retrieve the message, simply call:
m = SQS::Message.receive 'my_test_queue'
You can then inspect the contents
m.contents # returns {'var1' => 1, 'var2' => 2 }
remove from the SQS queue
m.dequeue!
or check the time in queue
m.time_in_queue
You can also retrieve the SQS id for the message with the id method.
Instance Attribute Summary collapse
-
#contents ⇒ Object
Payload to be sent across the wire.
-
#enqueued_at ⇒ Object
Returns the value of attribute enqueued_at.
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
-
#raw_message ⇒ Object
The raw RightAws message retrieved from SQS.
-
#received_at ⇒ Object
Returns the value of attribute received_at.
Class Method Summary collapse
-
.deserialize(str) ⇒ Object
Given a string, deserialize an object.
-
.peek(queue_name, number_to_view = 1, opts = {}) ⇒ Object
Retrives one or more messages from the queue, but sets their visiblity to zero so that they can immediately be picked up by another worker.
-
.receive(queue_name, number_to_receive = 1, opts = {}) ⇒ Object
Retrieves one or more Message objects from an SQS queue *
queue_nameis the name of the SQS queue *number_to_receivesets the maximum number of messages to return.
Instance Method Summary collapse
-
#dequeue! ⇒ Object
Call after retrieving to delete the message from SQS so that it cannot be retrieved by another client.
-
#id ⇒ Object
Returns the SQS id for this message; nil if the message has not yet been queued.
-
#initialize(new_queue_name) ⇒ Message
constructor
Initialize with the name of the name of the SQS queue.
-
#queue! ⇒ Object
Serializes
contentsand pushes data onto the SQS queue. -
#serialize(object) ⇒ Object
Given an object, serialize it to a string.
-
#time_in_queue ⇒ Object
Returns time message spent on queue in seconds.
Constructor Details
#initialize(new_queue_name) ⇒ Message
Initialize with the name of the name of the SQS queue. When queue! is called on an instance, data will be pushed to this queue.
66 67 68 |
# File 'lib/sqs/message.rb', line 66 def initialize new_queue_name @queue_name = new_queue_name end |
Instance Attribute Details
#contents ⇒ Object
Payload to be sent across the wire
37 38 39 |
# File 'lib/sqs/message.rb', line 37 def contents @contents end |
#enqueued_at ⇒ Object
Returns the value of attribute enqueued_at.
39 40 41 |
# File 'lib/sqs/message.rb', line 39 def enqueued_at @enqueued_at end |
#queue_name ⇒ Object
Returns the value of attribute queue_name.
39 40 41 |
# File 'lib/sqs/message.rb', line 39 def queue_name @queue_name end |
#raw_message ⇒ Object
The raw RightAws message retrieved from SQS
34 35 36 |
# File 'lib/sqs/message.rb', line 34 def end |
#received_at ⇒ Object
Returns the value of attribute received_at.
39 40 41 |
# File 'lib/sqs/message.rb', line 39 def received_at @received_at end |
Class Method Details
.deserialize(str) ⇒ Object
Given a string, deserialize an object. By default this calls JSON.parse but could be overriden to use another deserialization method.
102 103 104 |
# File 'lib/sqs/message.rb', line 102 def self.deserialize str str.nil? ? nil : JSON.parse(str) end |
.peek(queue_name, number_to_view = 1, opts = {}) ⇒ Object
Retrives one or more messages from the queue, but sets their visiblity to zero so that they can immediately be picked up by another worker. Useful for previewing messages on the queue.
-
queue_nameis the name of the SQS queue -
number_to_viewsets the maximum number of messages to return. If set to 1, a single instance is returned; if > 1 an array is returned. Defaults to 1.
No options at this time.
60 61 62 |
# File 'lib/sqs/message.rb', line 60 def self.peek queue_name, number_to_view=1, opts={} self.receive(queue_name, number_to_view, :visibility => 0) end |
.receive(queue_name, number_to_receive = 1, opts = {}) ⇒ Object
Retrieves one or more Message objects from an SQS queue
-
queue_nameis the name of the SQS queue -
number_to_receivesets the maximum number of messages to return. If set to 1, a single instance is returned; if > 1 an array is returned. Defaults to 1.
Valid options:
-
:visiblity- sets the visiblity timeout on the message in seconds
47 48 49 50 51 52 53 |
# File 'lib/sqs/message.rb', line 47 def self.receive queue_name, number_to_receive=1, opts={} = SQS::Queue[queue_name].(number_to_receive, opts[:visibility]) received_at = Time.now.utc .map! { |m| self.(queue_name, m, received_at) } number_to_receive == 1 ? .first : end |
Instance Method Details
#dequeue! ⇒ Object
Call after retrieving to delete the message from SQS so that it cannot be retrieved by another client.
83 84 85 |
# File 'lib/sqs/message.rb', line 83 def dequeue! .delete end |
#id ⇒ Object
Returns the SQS id for this message; nil if the message has not yet been queued.
71 72 73 |
# File 'lib/sqs/message.rb', line 71 def id ? .id : nil end |
#queue! ⇒ Object
Serializes contents and pushes data onto the SQS queue.
76 77 78 79 80 |
# File 'lib/sqs/message.rb', line 76 def queue! @enqueued_at = Time.now.utc raise SQS::MessageSizeError, "Message exceeds 8k limit by #{sqs_message_body.size - 8_000} bytes" if .size > 8_000 = SQS::Queue[queue_name].push() end |
#serialize(object) ⇒ Object
Given an object, serialize it to a string. By default, this simply calls to_json on the object, but could be overriden to serialize using YAML or Marshal.dump.
96 97 98 |
# File 'lib/sqs/message.rb', line 96 def serialize object object.to_json end |
#time_in_queue ⇒ Object
Returns time message spent on queue in seconds. Raises an ArgumentError if called on a message that has not been dequeued.
89 90 91 92 |
# File 'lib/sqs/message.rb', line 89 def time_in_queue raise ArgumentError, "This message has not been dequeued properly; either enqueued_at or received_at is nil" if enqueued_at.nil? || received_at.nil? enqueued_at - received_at end |