Class: Sequent::Core::AggregateRoot

Inherits:
Object
  • Object
show all
Extended by:
ActiveSupport::DescendantsTracker
Includes:
Helpers::AutosetAttributes, Helpers::MessageHandler, Helpers::UniqueKeys, SnapshotConfiguration
Defined in:
lib/sequent/core/aggregate_root.rb

Overview

Base class for all your domain classes.

load_from_history functionality to be loaded_from_history, meaning a stream of events.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SnapshotConfiguration

included

Methods included from Helpers::UniqueKeys

included, #unique_keys

Methods included from Helpers::AutosetAttributes

included

Methods included from Helpers::MessageHandler

#dispatch_message, #handle_message, included

Constructor Details

#initialize(id) ⇒ AggregateRoot

Returns a new instance of AggregateRoot.



63
64
65
66
67
# File 'lib/sequent/core/aggregate_root.rb', line 63

def initialize(id)
  @id = id
  @uncommitted_events = []
  @sequence_number = 1
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



45
46
47
# File 'lib/sequent/core/aggregate_root.rb', line 45

def id
  @id
end

#latest_snapshot_sequence_numberObject

Returns the value of attribute latest_snapshot_sequence_number.



46
47
48
# File 'lib/sequent/core/aggregate_root.rb', line 46

def latest_snapshot_sequence_number
  @latest_snapshot_sequence_number
end

#sequence_numberObject (readonly)

Returns the value of attribute sequence_number.



45
46
47
# File 'lib/sequent/core/aggregate_root.rb', line 45

def sequence_number
  @sequence_number
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



45
46
47
# File 'lib/sequent/core/aggregate_root.rb', line 45

def uncommitted_events
  @uncommitted_events
end

Class Method Details

.load_from_history(stream, events) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/sequent/core/aggregate_root.rb', line 48

def self.load_from_history(stream, events)
  first, *rest = events
  if first.is_a? SnapshotEvent
    # rubocop:disable Security/MarshalLoad
    aggregate_root = Marshal.load(Base64.decode64(first.data))
    # rubocop:enable Security/MarshalLoad
    aggregate_root.latest_snapshot_sequence_number = first.sequence_number
    rest.each { |x| aggregate_root.apply_event(x) }
  else
    aggregate_root = allocate # allocate without calling new
    aggregate_root.load_from_history(stream, events)
  end
  aggregate_root
end

.stream_from_history(stream) ⇒ Object



93
94
95
96
97
# File 'lib/sequent/core/aggregate_root.rb', line 93

def self.stream_from_history(stream)
  aggregate_root = allocate
  aggregate_root.initialize_for_streaming(stream)
  aggregate_root
end

Instance Method Details

#apply_event(event) ⇒ Object



138
139
140
141
# File 'lib/sequent/core/aggregate_root.rb', line 138

def apply_event(event)
  handle_message(event)
  @sequence_number = event.sequence_number + 1
end

#clear_eventsObject



124
125
126
# File 'lib/sequent/core/aggregate_root.rb', line 124

def clear_events
  @uncommitted_events = []
end

#event_streamObject



103
104
105
106
107
108
109
110
111
# File 'lib/sequent/core/aggregate_root.rb', line 103

def event_stream
  EventStream.new(
    aggregate_type: self.class.name,
    aggregate_id: id,
    events_partition_key: events_partition_key,
    snapshot_outdated_at: snapshot_outdated? ? Time.now : nil,
    unique_keys:,
  )
end

#events_partition_keyObject

Provide the partitioning key for storing events. This value must be a string and will be used by PostgreSQL to store the events in the right partition.

The value may change over the lifetime of the aggregate, old events will be moved to the correct partition after a change. This can be an expensive database operation.



120
121
122
# File 'lib/sequent/core/aggregate_root.rb', line 120

def events_partition_key
  nil
end

#initialize_for_streaming(stream) ⇒ Object



79
80
81
82
83
# File 'lib/sequent/core/aggregate_root.rb', line 79

def initialize_for_streaming(stream)
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
end

#load_from_history(stream, events) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/sequent/core/aggregate_root.rb', line 69

def load_from_history(stream, events)
  fail 'Empty history' if events.empty?

  @id = events.first.aggregate_id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
  events.each { |event| apply_event(event) }
end

#snapshot_outdated?Boolean

Returns:



128
129
130
131
132
# File 'lib/sequent/core/aggregate_root.rb', line 128

def snapshot_outdated?
  snapshot_threshold = self.class.snapshot_default_threshold
  events_since_latest_snapshot = @sequence_number - (latest_snapshot_sequence_number || 1)
  snapshot_threshold.present? && events_since_latest_snapshot >= snapshot_threshold
end

#stream_from_history(stream_events) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/sequent/core/aggregate_root.rb', line 85

def stream_from_history(stream_events)
  _stream, event = stream_events
  fail 'Empty history' if event.blank?

  @id ||= event.aggregate_id
  apply_event(event)
end

#take_snapshotObject



134
135
136
# File 'lib/sequent/core/aggregate_root.rb', line 134

def take_snapshot
  build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
end

#to_sObject



99
100
101
# File 'lib/sequent/core/aggregate_root.rb', line 99

def to_s
  "#{self.class.name}: #{@id}"
end