Class: Sourced::Message

Inherits:
Types::Data
  • Object
show all
Defined in:
lib/sourced/message.rb

Direct Known Subclasses

Command, Event

Defined Under Namespace

Classes: Payload, Registry

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Message

Returns a new instance of Message.



135
136
137
138
139
140
# File 'lib/sourced/message.rb', line 135

def initialize(attrs = {})
  unless attrs[:payload]
    attrs = attrs.merge(payload: {})
  end
  super(attrs)
end

Class Method Details

.define(type_str, payload_schema: nil, &payload_block) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/sourced/message.rb', line 109

def self.define(type_str, payload_schema: nil, &payload_block)
  type_str.freeze unless type_str.frozen?
  if registry[type_str]
    Sourced.config.logger.warn("Message '#{type_str}' already defined")
  end

  registry[type_str] = Class.new(self) do
    def self.node_name = :data
    define_singleton_method(:type) { type_str }

    attribute :type, Types::Static[type_str]
    if payload_schema
      attribute :payload, Payload[payload_schema]
    elsif block_given?
      attribute :payload, Payload, &payload_block if block_given?
    end
  end
end

.from(attrs) ⇒ Object



128
129
130
131
132
133
# File 'lib/sourced/message.rb', line 128

def self.from(attrs)
  klass = registry[attrs[:type]]
  raise UnknownMessageError, "Unknown event type: #{attrs[:type]}" unless klass

  klass.new(attrs)
end

.registryObject



100
101
102
# File 'lib/sourced/message.rb', line 100

def self.registry
  @registry ||= Registry.new(self)
end

Instance Method Details

#delay(datetime) ⇒ Object



180
181
182
183
184
185
# File 'lib/sourced/message.rb', line 180

def delay(datetime)
  if datetime < created_at
    raise PastMessageDateError, "Message #{type} can't be delayed to a date in the past"
  end
  with(created_at: datetime)
end

#follow(event_class, payload_attrs = nil) ⇒ Object



149
150
151
152
153
154
# File 'lib/sourced/message.rb', line 149

def follow(event_class, payload_attrs = nil)
  follow_with_attributes(
    event_class,
    payload: payload_attrs
  )
end

#follow_with_attributes(event_class, attrs: {}, payload: nil, metadata: nil) ⇒ Object



172
173
174
175
176
177
178
# File 'lib/sourced/message.rb', line 172

def follow_with_attributes(event_class, attrs: {}, payload: nil, metadata: nil)
  meta = self.
  meta = meta.merge() if 
  attrs = { stream_id:, causation_id: id, correlation_id:, metadata: meta }.merge(attrs)
  attrs[:payload] = payload.to_h if payload
  event_class.parse(attrs)
end

#follow_with_seq(event_class, seq, payload_attrs = nil) ⇒ Object



156
157
158
159
160
161
162
# File 'lib/sourced/message.rb', line 156

def follow_with_seq(event_class, seq, payload_attrs = nil)
  follow_with_attributes(
    event_class,
    attrs: { seq: },
    payload: payload_attrs
  )
end

#follow_with_stream_id(event_class, stream_id, payload_attrs = nil) ⇒ Object



164
165
166
167
168
169
170
# File 'lib/sourced/message.rb', line 164

def follow_with_stream_id(event_class, stream_id, payload_attrs = nil)
  follow_with_attributes(
    event_class,
    attrs: { stream_id: },
    payload: payload_attrs
  )
end

#to_jsonObject



187
188
189
# File 'lib/sourced/message.rb', line 187

def to_json(*)
  to_h.to_json(*)
end

#with_metadata(meta = {}) ⇒ Object



142
143
144
145
146
147
# File 'lib/sourced/message.rb', line 142

def (meta = {})
  return self if meta.empty?

  attrs = .merge(meta)
  with(metadata: attrs)
end