Class: Sourced::Message
- Inherits:
-
Types::Data
- Object
- Types::Data
- Sourced::Message
show all
- Defined in:
- lib/sourced/message.rb
Defined Under Namespace
Classes: Payload, Registry
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(attrs = {}) ⇒ Message
Returns a new instance of Message.
135
136
137
138
139
140
|
# File 'lib/sourced/message.rb', line 135
def initialize(attrs = {})
unless attrs[:payload]
attrs = attrs.merge(payload: {})
end
super(attrs)
end
|
Class Method Details
.define(type_str, payload_schema: nil, &payload_block) ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/sourced/message.rb', line 109
def self.define(type_str, payload_schema: nil, &payload_block)
type_str.freeze unless type_str.frozen?
if registry[type_str]
Sourced.config.logger.warn("Message '#{type_str}' already defined")
end
registry[type_str] = Class.new(self) do
def self.node_name = :data
define_singleton_method(:type) { type_str }
attribute :type, Types::Static[type_str]
if payload_schema
attribute :payload, Payload[payload_schema]
elsif block_given?
attribute :payload, Payload, &payload_block if block_given?
end
end
end
|
.from(attrs) ⇒ Object
128
129
130
131
132
133
|
# File 'lib/sourced/message.rb', line 128
def self.from(attrs)
klass = registry[attrs[:type]]
raise UnknownMessageError, "Unknown event type: #{attrs[:type]}" unless klass
klass.new(attrs)
end
|
.registry ⇒ Object
100
101
102
|
# File 'lib/sourced/message.rb', line 100
def self.registry
@registry ||= Registry.new(self)
end
|
Instance Method Details
#delay(datetime) ⇒ Object
180
181
182
183
184
185
|
# File 'lib/sourced/message.rb', line 180
def delay(datetime)
if datetime < created_at
raise PastMessageDateError, "Message #{type} can't be delayed to a date in the past"
end
with(created_at: datetime)
end
|
#follow(event_class, payload_attrs = nil) ⇒ Object
149
150
151
152
153
154
|
# File 'lib/sourced/message.rb', line 149
def follow(event_class, payload_attrs = nil)
follow_with_attributes(
event_class,
payload: payload_attrs
)
end
|
#follow_with_attributes(event_class, attrs: {}, payload: nil, metadata: nil) ⇒ Object
172
173
174
175
176
177
178
|
# File 'lib/sourced/message.rb', line 172
def follow_with_attributes(event_class, attrs: {}, payload: nil, metadata: nil)
meta = self.metadata
meta = meta.merge(metadata) if metadata
attrs = { stream_id:, causation_id: id, correlation_id:, metadata: meta }.merge(attrs)
attrs[:payload] = payload.to_h if payload
event_class.parse(attrs)
end
|
#follow_with_seq(event_class, seq, payload_attrs = nil) ⇒ Object
156
157
158
159
160
161
162
|
# File 'lib/sourced/message.rb', line 156
def follow_with_seq(event_class, seq, payload_attrs = nil)
follow_with_attributes(
event_class,
attrs: { seq: },
payload: payload_attrs
)
end
|
#follow_with_stream_id(event_class, stream_id, payload_attrs = nil) ⇒ Object
164
165
166
167
168
169
170
|
# File 'lib/sourced/message.rb', line 164
def follow_with_stream_id(event_class, stream_id, payload_attrs = nil)
follow_with_attributes(
event_class,
attrs: { stream_id: },
payload: payload_attrs
)
end
|
#to_json ⇒ Object
187
188
189
|
# File 'lib/sourced/message.rb', line 187
def to_json(*)
to_h.to_json(*)
end
|
142
143
144
145
146
147
|
# File 'lib/sourced/message.rb', line 142
def with_metadata(meta = {})
return self if meta.empty?
attrs = metadata.merge(meta)
with(metadata: attrs)
end
|