Class: Cloudist::Message
- Includes:
- Encoding
- Defined in:
- lib/cloudist/message.rb
Instance Attribute Summary collapse
-
#body ⇒ Object
(also: #data)
readonly
Returns the value of attribute body.
-
#headers ⇒ Object
readonly
Returns the value of attribute headers.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#timestamp ⇒ Object
readonly
Returns the value of attribute timestamp.
Instance Method Summary collapse
- #created_at ⇒ Object
- #encoded ⇒ Object
-
#initialize(body, headers = {}) ⇒ Message
constructor
Expects body to be decoded.
- #inspect ⇒ Object
- #latency ⇒ Object
-
#publish(queue) ⇒ Object
Publishes this message to the exchange or queue Queue should be a Cloudist::Queue object responding to #publish.
- #published? ⇒ Boolean
- #published_at ⇒ Object
-
#reply(body, reply_headers = {}) ⇒ Object
Convenience method for replying Constructs a reply message and publishes it.
-
#set_queue_name_header(queue) ⇒ Object
This is so we can reply back to the sender.
- #update_headers(new_headers = {}) ⇒ Object
- #update_headers! ⇒ Object
- #update_published_date! ⇒ Object
Methods included from Encoding
Constructor Details
#initialize(body, headers = {}) ⇒ Message
Expects body to be decoded
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/cloudist/message.rb', line 8 def initialize(body, headers = {}) @body = Hashie::Mash.new(body.dup) @id ||= headers[:message_id] || headers[:id] && headers.delete(:id) || UUID.generate @headers = Hashie::Mash.new(headers.dup) @timestamp = Time.now.utc.to_f update_headers(headers) end |
Instance Attribute Details
#body ⇒ Object (readonly) Also known as: data
Returns the value of attribute body.
5 6 7 |
# File 'lib/cloudist/message.rb', line 5 def body @body end |
#headers ⇒ Object (readonly)
Returns the value of attribute headers.
5 6 7 |
# File 'lib/cloudist/message.rb', line 5 def headers @headers end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
5 6 7 |
# File 'lib/cloudist/message.rb', line 5 def id @id end |
#timestamp ⇒ Object (readonly)
Returns the value of attribute timestamp.
5 6 7 |
# File 'lib/cloudist/message.rb', line 5 def @timestamp end |
Instance Method Details
#created_at ⇒ Object
70 71 72 |
# File 'lib/cloudist/message.rb', line 70 def created_at headers. ? Time.at(headers..to_f) : end |
#encoded ⇒ Object
82 83 84 |
# File 'lib/cloudist/message.rb', line 82 def encoded [encode(body), {:headers => headers}] end |
#inspect ⇒ Object
86 87 88 |
# File 'lib/cloudist/message.rb', line 86 def inspect "<#{self.class.name} id=#{id}>" end |
#latency ⇒ Object
78 79 80 |
# File 'lib/cloudist/message.rb', line 78 def latency (published_at.to_f - created_at.to_f) end |
#publish(queue) ⇒ Object
Publishes this message to the exchange or queue Queue should be a Cloudist::Queue object responding to #publish
49 50 51 52 53 54 55 |
# File 'lib/cloudist/message.rb', line 49 def publish(queue) raise ArgumentError, "Publish expects a Cloudist::Queue instance" unless queue.is_a?(Cloudist::Queue) set_queue_name_header(queue) update_published_date! update_headers! queue.publish(self) end |
#published? ⇒ Boolean
66 67 68 |
# File 'lib/cloudist/message.rb', line 66 def published? @published ||= !!@headers.published_on end |
#published_at ⇒ Object
74 75 76 |
# File 'lib/cloudist/message.rb', line 74 def published_at headers[:published_on] ? Time.at(headers[:published_on].to_f) : end |
#reply(body, reply_headers = {}) ⇒ Object
Convenience method for replying Constructs a reply message and publishes it
38 39 40 41 42 43 44 45 |
# File 'lib/cloudist/message.rb', line 38 def reply(body, reply_headers = {}) raise RuntimeError, "Cannot reply to an unpublished message" unless published? msg = Message.new(body, reply_headers) msg.set_reply_header reply_q = Cloudist::ReplyQueue.new(headers[:queue_name]) msg.publish(reply_q) end |
#set_queue_name_header(queue) ⇒ Object
This is so we can reply back to the sender
62 63 64 |
# File 'lib/cloudist/message.rb', line 62 def set_queue_name_header(queue) update_headers(:queue_name => queue.name) end |
#update_headers(new_headers = {}) ⇒ Object
21 22 23 24 |
# File 'lib/cloudist/message.rb', line 21 def update_headers(new_headers = {}) update_headers! headers.merge!(new_headers) end |
#update_headers! ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/cloudist/message.rb', line 26 def update_headers! headers[:ttl] ||= Cloudist::DEFAULT_TTL headers[:timestamp] = headers[:message_id] ||= id headers[:message_type] = 'message' headers[:queue_name] ||= 'test' headers.each { |k,v| headers[k] = v.to_s } end |
#update_published_date! ⇒ Object
57 58 59 |
# File 'lib/cloudist/message.rb', line 57 def update_published_date! headers[:published_on] = Time.now.utc.to_f end |