Class: Cloudist::Message

Inherits:
Object show all
Includes:
Encoding
Defined in:
lib/cloudist/message.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Encoding

#decode, #encode

Constructor Details

#initialize(body, headers = {}) ⇒ Message

Expects body to be decoded



8
9
10
11
12
13
14
15
16
17
# File 'lib/cloudist/message.rb', line 8

def initialize(body, headers = {})
  @body = Hashie::Mash.new(body.dup)

  @id ||= headers[:message_id] || headers[:id] && headers.delete(:id) || UUID.generate
  @headers = Hashie::Mash.new(headers.dup)

  @timestamp = Time.now.utc.to_f

  update_headers(headers)
end

Instance Attribute Details

#bodyObject (readonly) Also known as: data

Returns the value of attribute body.



5
6
7
# File 'lib/cloudist/message.rb', line 5

def body
  @body
end

#headersObject (readonly)

Returns the value of attribute headers.



5
6
7
# File 'lib/cloudist/message.rb', line 5

def headers
  @headers
end

#idObject (readonly)

Returns the value of attribute id.



5
6
7
# File 'lib/cloudist/message.rb', line 5

def id
  @id
end

#timestampObject (readonly)

Returns the value of attribute timestamp.



5
6
7
# File 'lib/cloudist/message.rb', line 5

def timestamp
  @timestamp
end

Instance Method Details

#created_atObject



70
71
72
# File 'lib/cloudist/message.rb', line 70

def created_at
  headers.timestamp ? Time.at(headers.timestamp.to_f) : timestamp
end

#encodedObject



82
83
84
# File 'lib/cloudist/message.rb', line 82

def encoded
  [encode(body), {:headers => headers}]
end

#inspectObject



86
87
88
# File 'lib/cloudist/message.rb', line 86

def inspect
  "<#{self.class.name} id=#{id}>"
end

#latencyObject



78
79
80
# File 'lib/cloudist/message.rb', line 78

def latency
  (published_at.to_f - created_at.to_f)
end

#publish(queue) ⇒ Object

Publishes this message to the exchange or queue Queue should be a Cloudist::Queue object responding to #publish

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
# File 'lib/cloudist/message.rb', line 49

def publish(queue)
  raise ArgumentError, "Publish expects a Cloudist::Queue instance" unless queue.is_a?(Cloudist::Queue)
  set_queue_name_header(queue)
  update_published_date!
  update_headers!
  queue.publish(self)
end

#published?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/cloudist/message.rb', line 66

def published?
  @published ||= !!@headers.published_on
end

#published_atObject



74
75
76
# File 'lib/cloudist/message.rb', line 74

def published_at
  headers[:published_on] ? Time.at(headers[:published_on].to_f) : timestamp
end

#reply(body, reply_headers = {}) ⇒ Object

Convenience method for replying Constructs a reply message and publishes it

Raises:

  • (RuntimeError)


38
39
40
41
42
43
44
45
# File 'lib/cloudist/message.rb', line 38

def reply(body, reply_headers = {})
  raise RuntimeError, "Cannot reply to an unpublished message" unless published?

  msg = Message.new(body, reply_headers)
  msg.set_reply_header
  reply_q = Cloudist::ReplyQueue.new(headers[:queue_name])
  msg.publish(reply_q)
end

#set_queue_name_header(queue) ⇒ Object

This is so we can reply back to the sender



62
63
64
# File 'lib/cloudist/message.rb', line 62

def set_queue_name_header(queue)
  update_headers(:queue_name => queue.name)
end

#update_headers(new_headers = {}) ⇒ Object



21
22
23
24
# File 'lib/cloudist/message.rb', line 21

def update_headers(new_headers = {})
  update_headers!
  headers.merge!(new_headers)
end

#update_headers!Object



26
27
28
29
30
31
32
33
34
# File 'lib/cloudist/message.rb', line 26

def update_headers!
  headers[:ttl] ||= Cloudist::DEFAULT_TTL
  headers[:timestamp] = timestamp
  headers[:message_id] ||= id
  headers[:message_type] = 'message'
  headers[:queue_name] ||= 'test'

  headers.each { |k,v| headers[k] = v.to_s }
end

#update_published_date!Object



57
58
59
# File 'lib/cloudist/message.rb', line 57

def update_published_date!
  headers[:published_on] = Time.now.utc.to_f
end