Class: SQS::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs/message.rb

Overview

SQS::Message provides a light wrapper over the RightAWS SQS API. It embeds the message payload in a JSON container to allow enqueue time and time_in_queue calculations.

Note that by default, JSON serialization is used to encode contents. This means that contents must be JSON-serializable (i.e. no cyclical objects, singleton strings, etc.) If this is problematic, the serialization mechanism can be customized by overriding the serialize and self.deserialize methods.

To use, simply instantiate, set the contents and call queue! For example:

m = SQS::Message.new 'my_test_queue'
m.contents = {:var1 => 1, :var2 => 2}
m.queue!

pushes a message containing => 1, :var2 => 2 to the ‘my_test_queue’ queue on SQS

To retrieve the message, simply call:

m = SQS::Message.receive 'my_test_queue'

You can then inspect the contents

m.contents # returns {'var1' => 1, 'var2' => 2 }

remove from the SQS queue

m.dequeue!

or check the time in queue

m.time_in_queue

You can also retrieve the SQS id for the message with the id method.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(new_queue_name) ⇒ Message

Initialize with the name of the name of the SQS queue. When queue! is called on an instance, data will be pushed to this queue.



66
67
68
# File 'lib/sqs/message.rb', line 66

def initialize new_queue_name
  @queue_name = new_queue_name
end

Instance Attribute Details

#contentsObject

Payload to be sent across the wire



37
38
39
# File 'lib/sqs/message.rb', line 37

def contents
  @contents
end

#enqueued_atObject

Returns the value of attribute enqueued_at.



39
40
41
# File 'lib/sqs/message.rb', line 39

def enqueued_at
  @enqueued_at
end

#queue_nameObject

Returns the value of attribute queue_name.



39
40
41
# File 'lib/sqs/message.rb', line 39

def queue_name
  @queue_name
end

#raw_messageObject

The raw RightAws message retrieved from SQS



34
35
36
# File 'lib/sqs/message.rb', line 34

def raw_message
  @raw_message
end

#received_atObject

Returns the value of attribute received_at.



39
40
41
# File 'lib/sqs/message.rb', line 39

def received_at
  @received_at
end

Class Method Details

.deserialize(str) ⇒ Object

Given a string, deserialize an object. By default this calls JSON.parse but could be overriden to use another deserialization method.



102
103
104
# File 'lib/sqs/message.rb', line 102

def self.deserialize str
  str.nil? ? nil : JSON.parse(str)
end

.peek(queue_name, number_to_view = 1, opts = {}) ⇒ Object

Retrives one or more messages from the queue, but sets their visiblity to zero so that they can immediately be picked up by another worker. Useful for previewing messages on the queue.

  • queue_name is the name of the SQS queue

  • number_to_view sets the maximum number of messages to return. If set to 1, a single instance is returned; if > 1 an array is returned. Defaults to 1.

No options at this time.



60
61
62
# File 'lib/sqs/message.rb', line 60

def self.peek queue_name, number_to_view=1, opts={}
  self.receive(queue_name, number_to_view, :visibility => 0)
end

.receive(queue_name, number_to_receive = 1, opts = {}) ⇒ Object

Retrieves one or more Message objects from an SQS queue

  • queue_name is the name of the SQS queue

  • number_to_receive sets the maximum number of messages to return. If set to 1, a single instance is returned; if > 1 an array is returned. Defaults to 1.

Valid options:

  • :visiblity - sets the visiblity timeout on the message in seconds



47
48
49
50
51
52
53
# File 'lib/sqs/message.rb', line 47

def self.receive queue_name, number_to_receive=1, opts={}
  messages = SQS::Queue[queue_name].receive_messages(number_to_receive, opts[:visibility])
  received_at = Time.now.utc
  messages.map! { |m| self.create_from_raw_message(queue_name, m, received_at) } 

  number_to_receive == 1 ? messages.first : messages
end

Instance Method Details

#dequeue!Object

Call after retrieving to delete the message from SQS so that it cannot be retrieved by another client.



83
84
85
# File 'lib/sqs/message.rb', line 83

def dequeue!
  raw_message.delete
end

#idObject

Returns the SQS id for this message; nil if the message has not yet been queued.



71
72
73
# File 'lib/sqs/message.rb', line 71

def id
  raw_message ? raw_message.id : nil
end

#queue!Object

Serializes contents and pushes data onto the SQS queue.



76
77
78
79
80
# File 'lib/sqs/message.rb', line 76

def queue!
  @enqueued_at = Time.now.utc
  raise SQS::MessageSizeError, "Message exceeds 8k limit by #{sqs_message_body.size - 8_000} bytes" if sqs_message_body.size > 8_000
  raw_message = SQS::Queue[queue_name].push(sqs_message_body)
end

#serialize(object) ⇒ Object

Given an object, serialize it to a string. By default, this simply calls to_json on the object, but could be overriden to serialize using YAML or Marshal.dump.



96
97
98
# File 'lib/sqs/message.rb', line 96

def serialize object
  object.to_json
end

#time_in_queueObject

Returns time message spent on queue in seconds. Raises an ArgumentError if called on a message that has not been dequeued.

Raises:

  • (ArgumentError)


89
90
91
92
# File 'lib/sqs/message.rb', line 89

def time_in_queue
  raise ArgumentError, "This message has not been dequeued properly; either enqueued_at or received_at is nil" if enqueued_at.nil? || received_at.nil?
  enqueued_at - received_at
end