Class: Sequent::Core::AggregateRepository
- Inherits:
-
Object
- Object
- Sequent::Core::AggregateRepository
- Defined in:
- lib/sequent/core/aggregate_repository.rb
Overview
Repository for aggregates.
Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.
On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.
The repository keeps track of the Unit-Of-Work per thread, so can be shared between threads.
Defined Under Namespace
Classes: AggregateNotFound, HasUncommittedEvents, NonUniqueAggregateId
Constant Summary collapse
- AGGREGATES_KEY =
Key used in thread local
'Sequent::Core::AggregateRepository::aggregates'.to_sym
Instance Method Summary collapse
-
#add_aggregate(aggregate) ⇒ Object
Adds the given aggregate to the repository (or unit of work).
-
#clear ⇒ Object
Clears the Unit of Work.
-
#clear! ⇒ Object
Clears the Unit of Work.
-
#commit(command) ⇒ Object
Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store.
-
#contains_aggregate?(aggregate_id) ⇒ Boolean
Returns whether the event store has an aggregate with the given id.
-
#ensure_exists(aggregate_id, clazz) ⇒ Object
Throws exception if not exists.
-
#load_aggregate(aggregate_id, clazz = nil) ⇒ Object
Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.
-
#load_aggregates(aggregate_ids, clazz = nil) ⇒ Object
Loads multiple aggregates at once.
Instance Method Details
#add_aggregate(aggregate) ⇒ Object
Adds the given aggregate to the repository (or unit of work).
Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store
40 41 42 43 44 45 46 47 |
# File 'lib/sequent/core/aggregate_repository.rb', line 40 def add_aggregate(aggregate) existing = aggregates[aggregate.id] if existing && !existing.equal?(aggregate) fail NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id]) else aggregates[aggregate.id] = aggregate end end |
#clear ⇒ Object
Clears the Unit of Work.
133 134 135 |
# File 'lib/sequent/core/aggregate_repository.rb', line 133 def clear Thread.current[AGGREGATES_KEY] = nil end |
#clear! ⇒ Object
Clears the Unit of Work.
A HasUncommittedEvents is raised when there are uncommitted_events in the Unit of Work.
140 141 142 143 144 |
# File 'lib/sequent/core/aggregate_repository.rb', line 140 def clear! fail HasUncommittedEvents if aggregates.values.any? { |x| !x.uncommitted_events.empty? } clear end |
#commit(command) ⇒ Object
Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store.
The events given to the EventStore are ordered in loading order of the different AggregateRoot’s. So Events are stored (and therefore published) in order in which they are ‘apply`-ed per AggregateRoot.
The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.
This is all abstracted away if you use the Sequent::Core::CommandService
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/sequent/core/aggregate_repository.rb', line 121 def commit(command) updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? } return if updated_aggregates.empty? streams_with_events = updated_aggregates.map do |aggregate| [aggregate.event_stream, aggregate.uncommitted_events] end updated_aggregates.each(&:clear_events) store_events command, streams_with_events end |
#contains_aggregate?(aggregate_id) ⇒ Boolean
Returns whether the event store has an aggregate with the given id
104 105 106 107 |
# File 'lib/sequent/core/aggregate_repository.rb', line 104 def contains_aggregate?(aggregate_id) Sequent.configuration.event_store.stream_exists?(aggregate_id) && Sequent.configuration.event_store.events_exists?(aggregate_id) end |
#ensure_exists(aggregate_id, clazz) ⇒ Object
Throws exception if not exists.
50 51 52 |
# File 'lib/sequent/core/aggregate_repository.rb', line 50 def ensure_exists(aggregate_id, clazz) !load_aggregate(aggregate_id, clazz).nil? end |
#load_aggregate(aggregate_id, clazz = nil) ⇒ Object
Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.
56 57 58 |
# File 'lib/sequent/core/aggregate_repository.rb', line 56 def load_aggregate(aggregate_id, clazz = nil) load_aggregates([aggregate_id], clazz)[0] end |
#load_aggregates(aggregate_ids, clazz = nil) ⇒ Object
Loads multiple aggregates at once. Returns the ones in the current Unit Of Work otherwise loads it from history.
Note: This will load all the aggregates in memory, so querying 100s of aggregates with 100s of events could cause memory issues.
Returns all aggregates or raises AggregateNotFound If clazz is given and one of the aggregates is not of the correct type a TypeError is raised.
aggregate_ids The ids of the aggregates to be loaded clazz Optional argument that checks if all aggregates are of type clazz
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/sequent/core/aggregate_repository.rb', line 73 def load_aggregates(aggregate_ids, clazz = nil) fail ArgumentError, 'aggregate_ids is required' unless aggregate_ids return [] if aggregate_ids.empty? unique_ids = aggregate_ids.uniq result = aggregates.values_at(*unique_ids).compact query_ids = unique_ids - result.map(&:id) result += Sequent.configuration.event_store.load_events_for_aggregates(query_ids).map do |stream, events| aggregate_class = Class.const_get(stream.aggregate_type) aggregate_class.load_from_history(stream, events) end if result.count != unique_ids.count missing_aggregate_ids = unique_ids - result.map(&:id) fail AggregateNotFound, missing_aggregate_ids end if clazz result.each do |aggregate| fail TypeError, "#{aggregate.class} is not a #{clazz}" unless aggregate.class <= clazz end end result.map do |aggregate| aggregates[aggregate.id] = aggregate end end |