Class: Cloudist::Payload
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Encoding
#decode, #encode
Methods included from Utils
#decode_json, #decode_message, #encode_message, #generate_name_for_instance, #generate_queue, #generate_reply_to, #generate_sym, #log_prefix, #reply_prefix, #stats_prefix
Constructor Details
#initialize(body, headers = {}, publish_opts = {}) ⇒ Payload
Returns a new instance of Payload.
8
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/cloudist/payload.rb', line 8
def initialize(body, = {})
@published = false
@timestamp = Time.now.to_f
body = decode(body) if body.is_a?(String)
@body = Hashie::Mash.new(decode(body))
@headers = Hashie::Mash.new()
@amqp_headers = {}
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args, &blk) ⇒ Object
80
81
82
83
84
85
86
87
88
|
# File 'lib/cloudist/payload.rb', line 80
def method_missing(meth, *args, &blk)
if body.has_key?(meth.to_s)
return body[meth]
elsif key = meth.to_s.match(/(.+)(?:\?$)/).to_a.last
body.has_key?(key.to_s)
else
super
end
end
|
Instance Attribute Details
Returns the value of attribute amqp_headers.
6
7
8
|
# File 'lib/cloudist/payload.rb', line 6
def
@amqp_headers
end
|
Returns the value of attribute body.
6
7
8
|
# File 'lib/cloudist/payload.rb', line 6
def body
@body
end
|
Returns the value of attribute headers.
6
7
8
|
# File 'lib/cloudist/payload.rb', line 6
def
@headers
end
|
#publish_opts ⇒ Object
Returns the value of attribute publish_opts.
5
6
7
|
# File 'lib/cloudist/payload_old.rb', line 5
def publish_opts
@publish_opts
end
|
#timestamp ⇒ Object
Returns the value of attribute timestamp.
6
7
8
|
# File 'lib/cloudist/payload.rb', line 6
def timestamp
@timestamp
end
|
Instance Method Details
76
77
78
|
# File 'lib/cloudist/payload.rb', line 76
def [](key)
self.body[key.to_s]
end
|
#create_event_hash ⇒ Object
118
119
120
121
122
|
# File 'lib/cloudist/payload_old.rb', line 118
def create_event_hash
UUID.generate
end
|
58
59
60
61
62
|
# File 'lib/cloudist/payload.rb', line 58
def
h = .dup
h.each { |k,v| h[k] = v.to_s }
return h
end
|
#event_hash ⇒ Object
114
115
116
|
# File 'lib/cloudist/payload_old.rb', line 114
def event_hash
@event_hash ||= ["event_hash"] || create_event_hash
end
|
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/cloudist/payload_old.rb', line 53
def
raise StaleHeadersError, "Headers cannot be changed because payload has already been published" if published?
[:published_on] ||= body.is_a?(Hash) && body.delete(:published_on) || Time.now.utc.to_i
[:ttl] ||= body.is_a?(Hash) && body.delete('ttl') || Cloudist::DEFAULT_TTL
[:timestamp] = timestamp
[:event_hash] ||= id
[:message_id] ||= id
.each { |k,v| [k] = v.to_s }
end
|
#find_or_create_id ⇒ Object
21
22
23
24
25
26
27
|
# File 'lib/cloudist/payload.rb', line 21
def find_or_create_id
if ["message_id"]
.message_id
else
UUID.generate
end
end
|
Return message formatted as JSON and headers ready for transport in array
24
25
26
27
28
|
# File 'lib/cloudist/payload_old.rb', line 24
def formatted
[encode_message(body), publish_opts]
end
|
43
44
45
46
|
# File 'lib/cloudist/payload_old.rb', line 43
def freeze!
.freeze
body.freeze
end
|
#frozen? ⇒ Boolean
39
40
41
|
# File 'lib/cloudist/payload_old.rb', line 39
def frozen?
.frozen?
end
|
29
30
31
|
# File 'lib/cloudist/payload.rb', line 29
def id
find_or_create_id
end
|
#id=(new_id) ⇒ Object
34
35
36
37
|
# File 'lib/cloudist/payload_old.rb', line 34
def id=(new_id)
@id = new_id.to_s
end
|
#message_type ⇒ Object
72
73
74
|
# File 'lib/cloudist/payload.rb', line 72
def message_type
.message_type
end
|
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/cloudist/payload_old.rb', line 79
def
return { } unless
h = .dup
h[:published_on] = h[:published_on].to_i
h[:ttl] = h[:ttl].to_i rescue -1
h[:ttl] = -1 if h[:ttl] == 0
h
end
|
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/cloudist/payload.rb', line 37
def
[:published_on] ||= body.delete("timestamp") || timestamp
[:message_type] ||= body.delete("message_type") || 'reply'
[:ttl] ||= Cloudist::DEFAULT_TTL
[:message_id] = id
[:published_on] = [:published_on].to_f
[:ttl] = [:ttl].to_i rescue -1
[:ttl] = -1 if [:ttl] == 0
if timestamp > [:published_on]
@timestamp = [:published_on]
end
end
|
#parse_message(raw) ⇒ Object
124
125
126
127
128
|
# File 'lib/cloudist/payload_old.rb', line 124
def parse_message(raw)
decode_message(raw)
end
|
138
139
140
141
142
|
# File 'lib/cloudist/payload_old.rb', line 138
def publish
return if published?
@published = true
freeze!
end
|
#published? ⇒ Boolean
134
135
136
|
# File 'lib/cloudist/payload_old.rb', line 134
def published?
@published == true
end
|
#reply_name(queue_name) ⇒ Object
101
102
103
104
|
# File 'lib/cloudist/payload_old.rb', line 101
def reply_name(queue_name)
Utils.reply_prefix(queue_name)
end
|
68
69
70
|
# File 'lib/cloudist/payload.rb', line 68
def reply_to
.reply_to
end
|
#set_master_queue_name(queue_name) ⇒ Object
97
98
99
|
# File 'lib/cloudist/payload_old.rb', line 97
def set_master_queue_name(queue_name)
[:master_queue] = queue_name
end
|
#set_reply_to(queue_name) ⇒ Object
64
65
66
|
# File 'lib/cloudist/payload.rb', line 64
def set_reply_to(queue_name)
[:reply_to] = reply_prefix(queue_name)
end
|
33
34
35
|
# File 'lib/cloudist/payload.rb', line 33
def to_a
[encode(body), {:headers => }]
end
|
48
49
50
51
|
# File 'lib/cloudist/payload_old.rb', line 48
def
=
(publish_opts[:headers] ||= {}).merge!()
end
|