Class: Sequent::Core::AggregateRepository
- Inherits:
-
Object
- Object
- Sequent::Core::AggregateRepository
- Defined in:
- lib/sequent/core/aggregate_repository.rb
Overview
Repository for aggregates.
Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.
On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.
The repository is keeps track of the Unit-Of-Work per thread, so can be shared between threads.
Defined Under Namespace
Classes: AggregateNotFound, NonUniqueAggregateId
Constant Summary collapse
- AGGREGATES_KEY =
Key used in thread local
'Sequent::Core::AggregateRepository::aggregates'.to_sym
Instance Attribute Summary collapse
-
#event_store ⇒ Object
readonly
Returns the value of attribute event_store.
Instance Method Summary collapse
-
#add_aggregate(aggregate) ⇒ Object
Adds the given aggregate to the repository (or unit of work).
-
#clear ⇒ Object
Clears the Unit of Work.
-
#commit(command) ⇒ Object
Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store.
-
#contains_aggregate?(aggregate_id) ⇒ Boolean
Returns whether the event store has an aggregate with the given id.
-
#ensure_exists(aggregate_id, clazz) ⇒ Object
Throws exception if not exists.
-
#initialize(event_store) ⇒ AggregateRepository
constructor
A new instance of AggregateRepository.
-
#load_aggregate(aggregate_id, clazz = nil) ⇒ Object
Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.
Constructor Details
#initialize(event_store) ⇒ AggregateRepository
33 34 35 |
# File 'lib/sequent/core/aggregate_repository.rb', line 33 def initialize(event_store) @event_store = event_store end |
Instance Attribute Details
#event_store ⇒ Object (readonly)
Returns the value of attribute event_store.
19 20 21 |
# File 'lib/sequent/core/aggregate_repository.rb', line 19 def event_store @event_store end |
Instance Method Details
#add_aggregate(aggregate) ⇒ Object
Adds the given aggregate to the repository (or unit of work).
Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store
42 43 44 45 46 47 48 49 |
# File 'lib/sequent/core/aggregate_repository.rb', line 42 def add_aggregate(aggregate) existing = aggregates[aggregate.id] if existing && !existing.equal?(aggregate) raise NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id]) else aggregates[aggregate.id] = aggregate end end |
#clear ⇒ Object
Clears the Unit of Work.
95 96 97 |
# File 'lib/sequent/core/aggregate_repository.rb', line 95 def clear Thread.current[AGGREGATES_KEY] = nil end |
#commit(command) ⇒ Object
Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store. The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.
This is all abstracted away if you use the Sequent::Core::CommandService
84 85 86 87 88 89 90 91 92 |
# File 'lib/sequent/core/aggregate_repository.rb', line 84 def commit(command) updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? } return if updated_aggregates.empty? streams_with_events = updated_aggregates.map do |aggregate| [ aggregate.event_stream, aggregate.uncommitted_events ] end updated_aggregates.each(&:clear_events) store_events command, streams_with_events end |
#contains_aggregate?(aggregate_id) ⇒ Boolean
Returns whether the event store has an aggregate with the given id
73 74 75 |
# File 'lib/sequent/core/aggregate_repository.rb', line 73 def contains_aggregate?(aggregate_id) @event_store.stream_exists?(aggregate_id) end |
#ensure_exists(aggregate_id, clazz) ⇒ Object
Throws exception if not exists.
52 53 54 |
# File 'lib/sequent/core/aggregate_repository.rb', line 52 def ensure_exists(aggregate_id, clazz) !load_aggregate(aggregate_id, clazz).nil? end |
#load_aggregate(aggregate_id, clazz = nil) ⇒ Object
Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/sequent/core/aggregate_repository.rb', line 58 def load_aggregate(aggregate_id, clazz = nil) result = aggregates.fetch(aggregate_id) do |_| stream, events = @event_store.load_events(aggregate_id) raise AggregateNotFound.new(aggregate_id) unless stream aggregate_class = Class.const_get(stream.aggregate_type) aggregates[aggregate_id] = aggregate_class.load_from_history(stream, events) end raise TypeError, "#{result.class} is not a #{clazz}" if result && clazz && !(result.class <= clazz) result end |