Class: Sequent::Core::AggregateRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/sequent/core/aggregate_repository.rb

Overview

Repository for aggregates.

Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.

On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.

The repository keeps track of the Unit-Of-Work per thread, so can be shared between threads.

Defined Under Namespace

Classes: AggregateNotFound, NonUniqueAggregateId

Constant Summary collapse

AGGREGATES_KEY =

Key used in thread local

'Sequent::Core::AggregateRepository::aggregates'.to_sym

Instance Method Summary collapse

Instance Method Details

#add_aggregate(aggregate) ⇒ Object

Adds the given aggregate to the repository (or unit of work).

Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store



36
37
38
39
40
41
42
43
# File 'lib/sequent/core/aggregate_repository.rb', line 36

def add_aggregate(aggregate)
  existing = aggregates[aggregate.id]
  if existing && !existing.equal?(aggregate)
    raise NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id])
  else
    aggregates[aggregate.id] = aggregate
  end
end

#clearObject

Clears the Unit of Work.



122
123
124
# File 'lib/sequent/core/aggregate_repository.rb', line 122

def clear
  Thread.current[AGGREGATES_KEY] = nil
end

#commit(command) ⇒ Object

Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store. The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.

This is all abstracted away if you use the Sequent::Core::CommandService



111
112
113
114
115
116
117
118
119
# File 'lib/sequent/core/aggregate_repository.rb', line 111

def commit(command)
  updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? }
  return if updated_aggregates.empty?
  streams_with_events = updated_aggregates.map do |aggregate|
    [ aggregate.event_stream, aggregate.uncommitted_events ]
  end
  updated_aggregates.each(&:clear_events)
  store_events command, streams_with_events
end

#contains_aggregate?(aggregate_id) ⇒ Boolean

Returns whether the event store has an aggregate with the given id

Returns:



100
101
102
# File 'lib/sequent/core/aggregate_repository.rb', line 100

def contains_aggregate?(aggregate_id)
  Sequent.configuration.event_store.stream_exists?(aggregate_id)
end

#ensure_exists(aggregate_id, clazz) ⇒ Object

Throws exception if not exists.



46
47
48
# File 'lib/sequent/core/aggregate_repository.rb', line 46

def ensure_exists(aggregate_id, clazz)
  !load_aggregate(aggregate_id, clazz).nil?
end

#load_aggregate(aggregate_id, clazz = nil) ⇒ Object

Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.



52
53
54
# File 'lib/sequent/core/aggregate_repository.rb', line 52

def load_aggregate(aggregate_id, clazz = nil)
  load_aggregates([aggregate_id], clazz)[0]
end

#load_aggregates(aggregate_ids, clazz = nil) ⇒ Object

Loads multiple aggregates at once. Returns the ones in the current Unit Of Work otherwise loads it from history.

Note: This will load all the aggregates in memory, so querying 100s of aggregates with 100s of events could cause memory issues.

Returns all aggregates or raises AggregateNotFound If clazz is given and one of the aggregates is not of the correct type a TypeError is raised.

aggregate_ids The ids of the aggregates to be loaded clazz Optional argument that checks if all aggregates are of type clazz



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/sequent/core/aggregate_repository.rb', line 69

def load_aggregates(aggregate_ids, clazz = nil)
  fail ArgumentError.new('aggregate_ids is required') unless aggregate_ids
  return [] if aggregate_ids.empty?

  _aggregate_ids = aggregate_ids.uniq
  _aggregates = aggregates.values_at(*_aggregate_ids).compact
  _query_ids = _aggregate_ids - _aggregates.map(&:id)

  _aggregates += Sequent.configuration.event_store.load_events_for_aggregates(_query_ids).map do |stream, events|
    aggregate_class = Class.const_get(stream.aggregate_type)
    aggregate_class.load_from_history(stream, events)
  end

  if _aggregates.count != _aggregate_ids.count
    missing_aggregate_ids = _aggregate_ids - _aggregates.map(&:id)
    raise AggregateNotFound.new(missing_aggregate_ids)
  end

  if clazz
    _aggregates.each do |aggregate|
      raise TypeError, "#{aggregate.class} is not a #{clazz}" if !(aggregate.class <= clazz)
    end
  end

  _aggregates.map do |aggregate|
    aggregates[aggregate.id] = aggregate
  end
end