Class: Sequent::Core::AggregateRoot
Overview
Base class for all your domain classes.
load_from_history
functionality to be loaded_from_history, meaning a stream of events.
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
included
included, #unique_keys
included
#dispatch_message, #handle_message, included
Constructor Details
Returns a new instance of AggregateRoot.
63
64
65
66
67
|
# File 'lib/sequent/core/aggregate_root.rb', line 63
def initialize(id)
@id = id
@uncommitted_events = []
@sequence_number = 1
end
|
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
45
46
47
|
# File 'lib/sequent/core/aggregate_root.rb', line 45
def id
@id
end
|
#latest_snapshot_sequence_number ⇒ Object
Returns the value of attribute latest_snapshot_sequence_number.
46
47
48
|
# File 'lib/sequent/core/aggregate_root.rb', line 46
def latest_snapshot_sequence_number
@latest_snapshot_sequence_number
end
|
#sequence_number ⇒ Object
Returns the value of attribute sequence_number.
45
46
47
|
# File 'lib/sequent/core/aggregate_root.rb', line 45
def sequence_number
@sequence_number
end
|
#uncommitted_events ⇒ Object
Returns the value of attribute uncommitted_events.
45
46
47
|
# File 'lib/sequent/core/aggregate_root.rb', line 45
def uncommitted_events
@uncommitted_events
end
|
Class Method Details
.load_from_history(stream, events) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/sequent/core/aggregate_root.rb', line 48
def self.load_from_history(stream, events)
first, *rest = events
if first.is_a? SnapshotEvent
aggregate_root = Marshal.load(Base64.decode64(first.data))
aggregate_root.latest_snapshot_sequence_number = first.sequence_number
rest.each { |x| aggregate_root.apply_event(x) }
else
aggregate_root = allocate aggregate_root.load_from_history(stream, events)
end
aggregate_root
end
|
.stream_from_history(stream) ⇒ Object
93
94
95
96
97
|
# File 'lib/sequent/core/aggregate_root.rb', line 93
def self.stream_from_history(stream)
aggregate_root = allocate
aggregate_root.initialize_for_streaming(stream)
aggregate_root
end
|
Instance Method Details
#apply_event(event) ⇒ Object
138
139
140
141
|
# File 'lib/sequent/core/aggregate_root.rb', line 138
def apply_event(event)
handle_message(event)
@sequence_number = event.sequence_number + 1
end
|
#clear_events ⇒ Object
124
125
126
|
# File 'lib/sequent/core/aggregate_root.rb', line 124
def clear_events
@uncommitted_events = []
end
|
#event_stream ⇒ Object
103
104
105
106
107
108
109
110
111
|
# File 'lib/sequent/core/aggregate_root.rb', line 103
def event_stream
EventStream.new(
aggregate_type: self.class.name,
aggregate_id: id,
events_partition_key: events_partition_key,
snapshot_outdated_at: snapshot_outdated? ? Time.now : nil,
unique_keys:,
)
end
|
#events_partition_key ⇒ Object
Provide the partitioning key for storing events. This value must be a string and will be used by PostgreSQL to store the events in the right partition.
The value may change over the lifetime of the aggregate, old events will be moved to the correct partition after a change. This can be an expensive database operation.
120
121
122
|
# File 'lib/sequent/core/aggregate_root.rb', line 120
def events_partition_key
nil
end
|
#initialize_for_streaming(stream) ⇒ Object
79
80
81
82
83
|
# File 'lib/sequent/core/aggregate_root.rb', line 79
def initialize_for_streaming(stream)
@uncommitted_events = []
@sequence_number = 1
@event_stream = stream
end
|
#load_from_history(stream, events) ⇒ Object
69
70
71
72
73
74
75
76
77
|
# File 'lib/sequent/core/aggregate_root.rb', line 69
def load_from_history(stream, events)
fail 'Empty history' if events.empty?
@id = events.first.aggregate_id
@uncommitted_events = []
@sequence_number = 1
@event_stream = stream
events.each { |event| apply_event(event) }
end
|
#snapshot_outdated? ⇒ Boolean
128
129
130
131
132
|
# File 'lib/sequent/core/aggregate_root.rb', line 128
def snapshot_outdated?
snapshot_threshold = self.class.snapshot_default_threshold
events_since_latest_snapshot = @sequence_number - (latest_snapshot_sequence_number || 1)
snapshot_threshold.present? && events_since_latest_snapshot >= snapshot_threshold
end
|
#stream_from_history(stream_events) ⇒ Object
85
86
87
88
89
90
91
|
# File 'lib/sequent/core/aggregate_root.rb', line 85
def stream_from_history(stream_events)
_stream, event = stream_events
fail 'Empty history' if event.blank?
@id ||= event.aggregate_id
apply_event(event)
end
|
#take_snapshot ⇒ Object
134
135
136
|
# File 'lib/sequent/core/aggregate_root.rb', line 134
def take_snapshot
build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
end
|
#to_s ⇒ Object
99
100
101
|
# File 'lib/sequent/core/aggregate_root.rb', line 99
def to_s
"#{self.class.name}: #{@id}"
end
|