Class: EventHub::Message
- Inherits:
-
Object
- Object
- EventHub::Message
- Includes:
- Helper
- Defined in:
- lib/eventhub/message.rb
Constant Summary collapse
- VERSION =
'1.0.0'
- REQUIRED_HEADERS =
Headers that are required (value can be nil) in order to pass valid?
[ 'message_id', 'version', 'created_at', 'origin.module_id', 'origin.type', 'origin.site_id', 'process.name', 'process.step_position', 'process.execution_id', 'status.retried_count', 'status.code', 'status.message' ]
Instance Attribute Summary collapse
-
#body ⇒ Object
Returns the value of attribute body.
-
#header ⇒ Object
Returns the value of attribute header.
-
#raw ⇒ Object
Returns the value of attribute raw.
-
#routing_key ⇒ Object
Returns the value of attribute routing_key.
-
#vhost ⇒ Object
Returns the value of attribute vhost.
Class Method Summary collapse
Instance Method Summary collapse
- #append_to_execution_history(processor_name) ⇒ Object
-
#copy(status_code = STATUS_SUCCESS) ⇒ Object
copies the message and set’s provided status code (default: success), actual stamp, and a new message id.
- #initial? ⇒ Boolean
-
#initialize(header = nil, body = nil, raw = nil) ⇒ Message
constructor
A new instance of Message.
- #invalid? ⇒ Boolean
- #retry? ⇒ Boolean
- #retry_pending? ⇒ Boolean
- #success? ⇒ Boolean
- #to_json ⇒ Object
- #to_s ⇒ Object
- #valid? ⇒ Boolean
Methods included from Helper
#class_to_array, #duration, #format_string, #now_stamp
Constructor Details
#initialize(header = nil, body = nil, raw = nil) ⇒ Message
Returns a new instance of Message.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/eventhub/message.rb', line 46 def initialize(header = nil, body = nil, raw = nil) @header = header || {} @body = body || {} @raw = raw # set message defaults, that we have required headers @header.set('message_id', UUIDTools::UUID..to_s, false) @header.set('version', VERSION, false) @header.set('created_at', now_stamp, false) @header.set('origin.module_id', 'undefined', false) @header.set('origin.type', 'undefined', false) @header.set('origin.site_id', 'undefined', false) @header.set('process.name', 'undefined', false) @header.set('process.execution_id', UUIDTools::UUID..to_s, false) @header.set('process.step_position', 0, false) @header.set('status.retried_count', 0, false) @header.set('status.code', STATUS_INITIAL, false) @header.set('status.message', '', false) end |
Instance Attribute Details
#body ⇒ Object
Returns the value of attribute body.
24 25 26 |
# File 'lib/eventhub/message.rb', line 24 def body @body end |
#header ⇒ Object
Returns the value of attribute header.
24 25 26 |
# File 'lib/eventhub/message.rb', line 24 def header @header end |
#raw ⇒ Object
Returns the value of attribute raw.
24 25 26 |
# File 'lib/eventhub/message.rb', line 24 def raw @raw end |
#routing_key ⇒ Object
Returns the value of attribute routing_key.
24 25 26 |
# File 'lib/eventhub/message.rb', line 24 def routing_key @routing_key end |
#vhost ⇒ Object
Returns the value of attribute vhost.
24 25 26 |
# File 'lib/eventhub/message.rb', line 24 def vhost @vhost end |
Class Method Details
.from_json(raw) ⇒ Object
39 40 41 42 43 44 |
# File 'lib/eventhub/message.rb', line 39 def self.from_json(raw) data = JSON.parse(raw) Message.new(data.get('header'), data.get('body'),raw) rescue => e Message.new({ "status" => { "code" => STATUS_INVALID, "message" => "JSON parse error: #{e}" }} ,{ "original_message_base64_encoded" => Base64.encode64(raw)},raw) end |
.translate_status_code(code) ⇒ Object
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/eventhub/message.rb', line 125 def self.translate_status_code(code) case code when EventHub::STATUS_INITIAL then return 'STATUS_INITIAL' when EventHub::STATUS_SUCCESS then return 'STATUS_SUCCESS' when EventHub::STATUS_RETRY then return 'STATUS_RETRY' when EventHub::STATUS_RETRY_PENDING then return 'STATUS_RETRY_PENDING' when EventHub::STATUS_INVALID then return 'STATUS_INVALID' when EventHub::STATUS_DEADLETTER then return 'STATUS_DEADLETTER' end end |
Instance Method Details
#append_to_execution_history(processor_name) ⇒ Object
118 119 120 121 122 123 |
# File 'lib/eventhub/message.rb', line 118 def append_to_execution_history(processor_name) unless header.get('execution_history') header.set('execution_history', []) end header.get('execution_history') << {'processor' => processor_name, 'timestamp' => now_stamp} end |
#copy(status_code = STATUS_SUCCESS) ⇒ Object
copies the message and set’s provided status code (default: success), actual stamp, and a new message id
105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/eventhub/message.rb', line 105 def copy(status_code = STATUS_SUCCESS) # use Marshal dump and load to make a deep object copy copied_header = Marshal.load( Marshal.dump(header)) copied_body = Marshal.load( Marshal.dump(body)) copied_header.set("message_id",UUIDTools::UUID..to_s) copied_header.set("created_at",now_stamp) copied_header.set("status.code",status_code) Message.new(copied_header, copied_body) end |
#initial? ⇒ Boolean
84 85 86 |
# File 'lib/eventhub/message.rb', line 84 def initial? self.status_code == STATUS_INITIAL end |
#invalid? ⇒ Boolean
92 93 94 |
# File 'lib/eventhub/message.rb', line 92 def invalid? self.status_code == STATUS_INVALID end |
#retry? ⇒ Boolean
80 81 82 |
# File 'lib/eventhub/message.rb', line 80 def retry? !success? end |
#retry_pending? ⇒ Boolean
88 89 90 |
# File 'lib/eventhub/message.rb', line 88 def retry_pending? self.status_code == STATUS_RETRY_PENDING end |
#success? ⇒ Boolean
76 77 78 |
# File 'lib/eventhub/message.rb', line 76 def success? self.status_code == STATUS_SUCCESS end |
#to_json ⇒ Object
96 97 98 |
# File 'lib/eventhub/message.rb', line 96 def to_json {'header' => self.header, 'body' => self.body}.to_json end |
#to_s ⇒ Object
100 101 102 |
# File 'lib/eventhub/message.rb', line 100 def to_s "Msg: process [#{self.process_name},#{self.process_step_position},#{self.process_execution_id}], status [#{self.status_code},#{self.},#{self.status_retried_count}]" end |
#valid? ⇒ Boolean
71 72 73 74 |
# File 'lib/eventhub/message.rb', line 71 def valid? # check for existence and defined value REQUIRED_HEADERS.all? { |key| @header.all_keys_with_path.include?(key) && !!self.send(key.gsub(/\./,"_").to_sym)} end |