Class: Cloudist::Payload

Inherits:
Object show all
Includes:
Encoding, Utils
Defined in:
lib/cloudist/payload.rb,
lib/cloudist/payload_old.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Encoding

#decode, #encode

Methods included from Utils

#decode_json, #decode_message, #encode_message, #generate_name_for_instance, #generate_queue, #generate_reply_to, #generate_sym, #log_prefix, #reply_prefix, #stats_prefix

Constructor Details

#initialize(body, headers = {}, publish_opts = {}) ⇒ Payload

Returns a new instance of Payload.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/cloudist/payload.rb', line 8

def initialize(body, headers = {})
  @published = false
  @timestamp = Time.now.to_f

  body = decode(body) if body.is_a?(String)
  @body = Hashie::Mash.new(decode(body))
  @headers = Hashie::Mash.new(headers)
  @amqp_headers = {}
  # puts "Initialised Payload: #{id}"

  parse_headers!
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args, &blk) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/cloudist/payload.rb', line 80

def method_missing(meth, *args, &blk)
  if body.has_key?(meth.to_s)
    return body[meth]
  elsif key = meth.to_s.match(/(.+)(?:\?$)/).to_a.last
    body.has_key?(key.to_s)
  else
    super
  end
end

Instance Attribute Details

#amqp_headersObject (readonly)

Returns the value of attribute amqp_headers.



6
7
8
# File 'lib/cloudist/payload.rb', line 6

def amqp_headers
  @amqp_headers
end

#bodyObject (readonly)

Returns the value of attribute body.



6
7
8
# File 'lib/cloudist/payload.rb', line 6

def body
  @body
end

#headersObject (readonly)

Returns the value of attribute headers.



6
7
8
# File 'lib/cloudist/payload.rb', line 6

def headers
  @headers
end

#publish_optsObject (readonly)

Returns the value of attribute publish_opts.



5
6
7
# File 'lib/cloudist/payload_old.rb', line 5

def publish_opts
  @publish_opts
end

#timestampObject (readonly)

Returns the value of attribute timestamp.



6
7
8
# File 'lib/cloudist/payload.rb', line 6

def timestamp
  @timestamp
end

Instance Method Details

#[](key) ⇒ Object



76
77
78
# File 'lib/cloudist/payload.rb', line 76

def [](key)
  self.body[key.to_s]
end

#create_event_hashObject



118
119
120
121
122
# File 'lib/cloudist/payload_old.rb', line 118

def create_event_hash
  # s = Time.now.to_s + object_id.to_s + rand(100).to_s
  # Digest::MD5.hexdigest(s)
  UUID.generate
end

#encoded_headersObject



58
59
60
61
62
# File 'lib/cloudist/payload.rb', line 58

def encoded_headers
  h = headers.dup
  h.each { |k,v| h[k] = v.to_s }
  return h
end

#event_hashObject



114
115
116
# File 'lib/cloudist/payload_old.rb', line 114

def event_hash
  @event_hash ||= headers["event_hash"] || create_event_hash
end

#extract_custom_headersObject

Raises:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/cloudist/payload_old.rb', line 53

def extract_custom_headers
  raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
  headers[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i
  headers[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL
  headers[:timestamp] = timestamp
  # this is the event hash that gets transferred through various publish/reply actions
  headers[:event_hash] ||= id

  # this value should be unique for each published/received message pair
  headers[:message_id] ||= id

  # We use JSON for message transport exclusively
  # headers[:content_type] ||= 'application/json'

  # headers[:headers][:message_type] = 'event'
   # ||= body.delete('message_type') || 'reply'

  # headers[:headers] = custom_headers

  # some strange behavior with integers makes it better to
  # convert all amqp headers to strings to avoid any problems
  headers.each { |k,v| headers[k] = v.to_s }

  headers
end

#find_or_create_idObject



21
22
23
24
25
26
27
# File 'lib/cloudist/payload.rb', line 21

def find_or_create_id
  if headers["message_id"]
    headers.message_id
  else
    UUID.generate
  end
end

#formattedObject

Return message formatted as JSON and headers ready for transport in array



24
25
26
27
28
# File 'lib/cloudist/payload_old.rb', line 24

def formatted
  update_headers

  [encode_message(body), publish_opts]
end

#freeze!Object



43
44
45
46
# File 'lib/cloudist/payload_old.rb', line 43

def freeze!
  headers.freeze
  body.freeze
end

#frozen?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/cloudist/payload_old.rb', line 39

def frozen?
  headers.frozen?
end

#idObject



29
30
31
# File 'lib/cloudist/payload.rb', line 29

def id
  find_or_create_id
end

#id=(new_id) ⇒ Object



34
35
36
37
# File 'lib/cloudist/payload_old.rb', line 34

def id=(new_id)
  @id = new_id.to_s
  update_headers
end

#message_typeObject



72
73
74
# File 'lib/cloudist/payload.rb', line 72

def message_type
  headers.message_type
end

#parse_custom_headersObject



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/cloudist/payload_old.rb', line 79

def parse_custom_headers
  return { } unless headers

  h = headers.dup

  h[:published_on] = h[:published_on].to_i

  h[:ttl] = h[:ttl].to_i rescue -1
  h[:ttl] = -1 if h[:ttl] == 0

  h
end

#parse_headers!Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cloudist/payload.rb', line 37

def parse_headers!
  headers[:published_on] ||= body.delete("timestamp") || timestamp
  headers[:message_type] ||= body.delete("message_type") || 'reply'

  headers[:ttl] ||= Cloudist::DEFAULT_TTL
  headers[:message_id] = id

  headers[:published_on] = headers[:published_on].to_f

  headers[:ttl] = headers[:ttl].to_i rescue -1
  headers[:ttl] = -1 if headers[:ttl] == 0

  # If this payload was received with a timestamp,
  # we don't want to override it on #timestamp
  if timestamp > headers[:published_on]
    @timestamp = headers[:published_on]
  end

  headers
end

#parse_message(raw) ⇒ Object



124
125
126
127
128
# File 'lib/cloudist/payload_old.rb', line 124

def parse_message(raw)
  # return { } unless raw
  # decode_json(raw)
  decode_message(raw)
end

#publishObject



138
139
140
141
142
# File 'lib/cloudist/payload_old.rb', line 138

def publish
  return if published?
  @published = true
  freeze!
end

#published?Boolean

Returns:

  • (Boolean)


134
135
136
# File 'lib/cloudist/payload_old.rb', line 134

def published?
  @published == true
end

#reply_name(queue_name) ⇒ Object



101
102
103
104
# File 'lib/cloudist/payload_old.rb', line 101

def reply_name(queue_name)
  # "#{queue_name}.#{id}"
  Utils.reply_prefix(queue_name)
end

#reply_toObject



68
69
70
# File 'lib/cloudist/payload.rb', line 68

def reply_to
  headers.reply_to
end

#set_master_queue_name(queue_name) ⇒ Object



97
98
99
# File 'lib/cloudist/payload_old.rb', line 97

def set_master_queue_name(queue_name)
  headers[:master_queue] = queue_name
end

#set_reply_to(queue_name) ⇒ Object



64
65
66
# File 'lib/cloudist/payload.rb', line 64

def set_reply_to(queue_name)
  headers[:reply_to] = reply_prefix(queue_name)
end

#to_aObject



33
34
35
# File 'lib/cloudist/payload.rb', line 33

def to_a
  [encode(body), {:headers => encoded_headers}]
end

#update_headersObject



48
49
50
51
# File 'lib/cloudist/payload_old.rb', line 48

def update_headers
  headers = extract_custom_headers
  (publish_opts[:headers] ||= {}).merge!(headers)
end