Class: EventHub::Message

Inherits:
Object
  • Object
show all
Includes:
Helper
Defined in:
lib/eventhub/message.rb

Constant Summary collapse

VERSION =
'1.0.0'
REQUIRED_HEADERS =

Headers that are required (value can be nil) in order to pass valid?

[
  'message_id',
  'version',
  'created_at',
  'origin.module_id',
  'origin.type',
  'origin.site_id',
  'process.name',
  'process.step_position',
  'process.execution_id',
  'status.retried_count',
  'status.code',
  'status.message'
]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

#class_to_array, #duration, #format_string, #now_stamp

Constructor Details

#initialize(header = nil, body = nil, raw = nil) ⇒ Message

Returns a new instance of Message.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/eventhub/message.rb', line 46

def initialize(header = nil, body = nil, raw = nil)

  @header = header || {}
  @body   = body || {}
  @raw    = raw

  # set message defaults, that we have required headers
  @header.set('message_id', UUIDTools::UUID.timestamp_create.to_s, false)
  @header.set('version', VERSION, false)
  @header.set('created_at', now_stamp, false)

  @header.set('origin.module_id', 'undefined', false)
  @header.set('origin.type', 'undefined', false)
  @header.set('origin.site_id', 'undefined', false)

  @header.set('process.name', 'undefined', false)
  @header.set('process.execution_id', UUIDTools::UUID.timestamp_create.to_s, false)
  @header.set('process.step_position', 0, false)

  @header.set('status.retried_count', 0, false)
  @header.set('status.code', STATUS_INITIAL, false)
  @header.set('status.message', '', false)

end

Instance Attribute Details

#bodyObject

Returns the value of attribute body.



24
25
26
# File 'lib/eventhub/message.rb', line 24

def body
  @body
end

#headerObject

Returns the value of attribute header.



24
25
26
# File 'lib/eventhub/message.rb', line 24

def header
  @header
end

#rawObject

Returns the value of attribute raw.



24
25
26
# File 'lib/eventhub/message.rb', line 24

def raw
  @raw
end

#routing_keyObject

Returns the value of attribute routing_key.



24
25
26
# File 'lib/eventhub/message.rb', line 24

def routing_key
  @routing_key
end

#vhostObject

Returns the value of attribute vhost.



24
25
26
# File 'lib/eventhub/message.rb', line 24

def vhost
  @vhost
end

Class Method Details

.from_json(raw) ⇒ Object



39
40
41
42
43
44
# File 'lib/eventhub/message.rb', line 39

def self.from_json(raw)
  data = JSON.parse(raw)
  Message.new(data.get('header'), data.get('body'),raw)
rescue => e
  Message.new({ "status" =>  { "code" => STATUS_INVALID, "message" => "JSON parse error: #{e}" }} ,{ "original_message_base64_encoded" => Base64.encode64(raw)},raw)
end

.translate_status_code(code) ⇒ Object



125
126
127
128
129
130
131
132
133
134
# File 'lib/eventhub/message.rb', line 125

def self.translate_status_code(code)
  case code
    when EventHub::STATUS_INITIAL       then return 'STATUS_INITIAL'
    when EventHub::STATUS_SUCCESS       then return 'STATUS_SUCCESS'
    when EventHub::STATUS_RETRY         then return 'STATUS_RETRY'
    when EventHub::STATUS_RETRY_PENDING then return 'STATUS_RETRY_PENDING'
    when EventHub::STATUS_INVALID       then return 'STATUS_INVALID'
    when EventHub::STATUS_DEADLETTER    then return 'STATUS_DEADLETTER'
  end
end

Instance Method Details

#append_to_execution_history(processor_name) ⇒ Object



118
119
120
121
122
123
# File 'lib/eventhub/message.rb', line 118

def append_to_execution_history(processor_name)
  unless header.get('execution_history')
    header.set('execution_history', [])
  end
  header.get('execution_history') << {'processor' => processor_name, 'timestamp' => now_stamp}
end

#copy(status_code = STATUS_SUCCESS) ⇒ Object

copies the message and set’s provided status code (default: success), actual stamp, and a new message id



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/eventhub/message.rb', line 105

def copy(status_code = STATUS_SUCCESS)

  # use Marshal dump and load to make a deep object copy
  copied_header = Marshal.load( Marshal.dump(header))
  copied_body   = Marshal.load( Marshal.dump(body))

  copied_header.set("message_id",UUIDTools::UUID.timestamp_create.to_s)
  copied_header.set("created_at",now_stamp)
  copied_header.set("status.code",status_code)

  Message.new(copied_header, copied_body)
end

#initial?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/eventhub/message.rb', line 84

def initial?
  self.status_code == STATUS_INITIAL
end

#invalid?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/eventhub/message.rb', line 92

def invalid?
  self.status_code == STATUS_INVALID
end

#retry?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/eventhub/message.rb', line 80

def retry?
  !success?
end

#retry_pending?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/eventhub/message.rb', line 88

def retry_pending?
  self.status_code == STATUS_RETRY_PENDING
end

#success?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/eventhub/message.rb', line 76

def success?
  self.status_code == STATUS_SUCCESS
end

#to_jsonObject



96
97
98
# File 'lib/eventhub/message.rb', line 96

def to_json
  {'header' => self.header, 'body' => self.body}.to_json
end

#to_sObject



100
101
102
# File 'lib/eventhub/message.rb', line 100

def to_s
  "Msg: process [#{self.process_name},#{self.process_step_position},#{self.process_execution_id}], status [#{self.status_code},#{self.status_message},#{self.status_retried_count}]"
end

#valid?Boolean

Returns:

  • (Boolean)


71
72
73
74
# File 'lib/eventhub/message.rb', line 71

def valid?
  # check for existence and defined value
  REQUIRED_HEADERS.all? { |key| @header.all_keys_with_path.include?(key) && !!self.send(key.gsub(/\./,"_").to_sym)}
end