Class: Sequent::Core::AggregateRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/sequent/core/aggregate_repository.rb

Overview

Repository for aggregates.

Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.

On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.

The repository is keeps track of the Unit-Of-Work per thread, so can be shared between threads.

Defined Under Namespace

Classes: AggregateNotFound, NonUniqueAggregateId

Constant Summary collapse

AGGREGATES_KEY =

Key used in thread local

'Sequent::Core::AggregateRepository::aggregates'.to_sym

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_store) ⇒ AggregateRepository



33
34
35
# File 'lib/sequent/core/aggregate_repository.rb', line 33

def initialize(event_store)
  @event_store = event_store
end

Instance Attribute Details

#event_storeObject (readonly)

Returns the value of attribute event_store.



19
20
21
# File 'lib/sequent/core/aggregate_repository.rb', line 19

def event_store
  @event_store
end

Instance Method Details

#add_aggregate(aggregate) ⇒ Object

Adds the given aggregate to the repository (or unit of work).

Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store



42
43
44
45
46
47
48
49
# File 'lib/sequent/core/aggregate_repository.rb', line 42

def add_aggregate(aggregate)
  existing = aggregates[aggregate.id]
  if existing && !existing.equal?(aggregate)
    raise NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id])
  else
    aggregates[aggregate.id] = aggregate
  end
end

#clearObject

Clears the Unit of Work.



95
96
97
# File 'lib/sequent/core/aggregate_repository.rb', line 95

def clear
  Thread.current[AGGREGATES_KEY] = nil
end

#commit(command) ⇒ Object

Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store. The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.

This is all abstracted away if you use the Sequent::Core::CommandService



84
85
86
87
88
89
90
91
92
# File 'lib/sequent/core/aggregate_repository.rb', line 84

def commit(command)
  updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? }
  return if updated_aggregates.empty?
  streams_with_events = updated_aggregates.map do |aggregate|
    [ aggregate.event_stream, aggregate.uncommitted_events ]
  end
  updated_aggregates.each(&:clear_events)
  store_events command, streams_with_events
end

#contains_aggregate?(aggregate_id) ⇒ Boolean

Returns whether the event store has an aggregate with the given id



73
74
75
# File 'lib/sequent/core/aggregate_repository.rb', line 73

def contains_aggregate?(aggregate_id)
  @event_store.stream_exists?(aggregate_id)
end

#ensure_exists(aggregate_id, clazz) ⇒ Object

Throws exception if not exists.



52
53
54
# File 'lib/sequent/core/aggregate_repository.rb', line 52

def ensure_exists(aggregate_id, clazz)
  !load_aggregate(aggregate_id, clazz).nil?
end

#load_aggregate(aggregate_id, clazz = nil) ⇒ Object

Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.

Raises:

  • (TypeError)


58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/sequent/core/aggregate_repository.rb', line 58

def load_aggregate(aggregate_id, clazz = nil)
  result = aggregates.fetch(aggregate_id) do |_|
    stream, events = @event_store.load_events(aggregate_id)
    raise AggregateNotFound.new(aggregate_id) unless stream
    aggregate_class = Class.const_get(stream.aggregate_type)
    aggregates[aggregate_id] = aggregate_class.load_from_history(stream, events)
  end

  raise TypeError, "#{result.class} is not a #{clazz}" if result && clazz && !(result.class <= clazz)

  result
end