Class: Synapse::EventStore::Mongo::MongoEventStore

Inherits:
SnapshotEventStore
  • Object
show all
Defined in:
lib/synapse/event_store/mongo/event_store.rb

Overview

Implementation of an event store backed by a Mongo database

Instance Method Summary collapse

Constructor Details

#initialize(template, storage_strategy) ⇒ undefined

Parameters:



9
10
11
12
# File 'lib/synapse/event_store/mongo/event_store.rb', line 9

def initialize(template, storage_strategy)
  @storage_strategy = storage_strategy
  @template = template
end

Instance Method Details

#append_events(type_identifier, stream) ⇒ undefined

Parameters:

  • type_identifier (String)

    Type descriptor of the aggregate to append to

  • stream (DomainEventStream)

Returns:

  • (undefined)

Raises:

  • (EventStoreError)

    If an error occurs while appending the stream to the store



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/synapse/event_store/mongo/event_store.rb', line 46

def append_events(type_identifier, stream)
  events = stream.to_a
  documents = @storage_strategy.create_documents type_identifier, events

  begin
    @template.event_collection.insert documents
  rescue ::Mongo::OperationFailure => exception
    if exception.error_code == 11000
      raise Repository::ConcurrencyError,
        'Event for this aggregate and sequence number already present'
    end

    raise ex
  end
end

#append_snapshot_event(type_identifier, snapshot_event) ⇒ undefined

Parameters:

  • type_identifier (String)

    Type descriptor of the aggregate to append to

  • snapshot_event (DomainEventMessage)

Returns:

  • (undefined)

Raises:

  • (EventStoreError)

    If an error occurs while appending the event to the store



66
67
68
69
# File 'lib/synapse/event_store/mongo/event_store.rb', line 66

def append_snapshot_event(type_identifier, snapshot_event)
  documents = @storage_strategy.create_documents type_identifier, [snapshot_event]
  @template.snapshot_collection.insert documents
end

#ensure_indexesundefined

Returns:

  • (undefined)


15
16
17
# File 'lib/synapse/event_store/mongo/event_store.rb', line 15

def ensure_indexes
  @storage_strategy.ensure_indexes
end

#read_events(type_identifier, aggregate_id) ⇒ DomainEventStream

Parameters:

  • type_identifier (String)

    Type descriptor of the aggregate to retrieve

  • aggregate_id (Object)

Returns:

  • (DomainEventStream)

Raises:

  • (EventStoreError)

    If an error occurs while reading the stream from the store



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/synapse/event_store/mongo/event_store.rb', line 23

def read_events(type_identifier, aggregate_id)
  first_sequence_number = -1

  last_snapshot_commit = load_last_snapshot type_identifier, aggregate_id
  if last_snapshot_commit and last_snapshot_commit.size > 0
    first_sequence_number = last_snapshot_commit[0].sequence_number
  end

  first_sequence_number = first_sequence_number.next

  cursor = @storage_strategy.fetch_events type_identifier, aggregate_id, first_sequence_number

  unless last_snapshot_commit or cursor.has_next?
    raise StreamNotFoundError.new type_identifier, aggregate_id
  end

  CursorDomainEventStream.new @storage_strategy, cursor, last_snapshot_commit, aggregate_id
end