Class: Synapse::EventStore::Mongo::MongoEventStore
- Inherits:
-
SnapshotEventStore
- Object
- SnapshotEventStore
- Synapse::EventStore::Mongo::MongoEventStore
- Defined in:
- lib/synapse/event_store/mongo/event_store.rb
Overview
Implementation of an event store backed by a Mongo database
Instance Method Summary collapse
- #append_events(type_identifier, stream) ⇒ undefined
- #append_snapshot_event(type_identifier, snapshot_event) ⇒ undefined
- #ensure_indexes ⇒ undefined
- #initialize(template, storage_strategy) ⇒ undefined constructor
- #read_events(type_identifier, aggregate_id) ⇒ DomainEventStream
Constructor Details
#initialize(template, storage_strategy) ⇒ undefined
9 10 11 12 |
# File 'lib/synapse/event_store/mongo/event_store.rb', line 9 def initialize(template, storage_strategy) @storage_strategy = storage_strategy @template = template end |
Instance Method Details
#append_events(type_identifier, stream) ⇒ undefined
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/synapse/event_store/mongo/event_store.rb', line 46 def append_events(type_identifier, stream) events = stream.to_a documents = @storage_strategy.create_documents type_identifier, events begin @template.event_collection.insert documents rescue ::Mongo::OperationFailure => exception if exception.error_code == 11000 raise Repository::ConcurrencyError, 'Event for this aggregate and sequence number already present' end raise ex end end |
#append_snapshot_event(type_identifier, snapshot_event) ⇒ undefined
66 67 68 69 |
# File 'lib/synapse/event_store/mongo/event_store.rb', line 66 def append_snapshot_event(type_identifier, snapshot_event) documents = @storage_strategy.create_documents type_identifier, [snapshot_event] @template.snapshot_collection.insert documents end |
#ensure_indexes ⇒ undefined
15 16 17 |
# File 'lib/synapse/event_store/mongo/event_store.rb', line 15 def ensure_indexes @storage_strategy.ensure_indexes end |
#read_events(type_identifier, aggregate_id) ⇒ DomainEventStream
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/synapse/event_store/mongo/event_store.rb', line 23 def read_events(type_identifier, aggregate_id) first_sequence_number = -1 last_snapshot_commit = load_last_snapshot type_identifier, aggregate_id if last_snapshot_commit and last_snapshot_commit.size > 0 first_sequence_number = last_snapshot_commit[0].sequence_number end first_sequence_number = first_sequence_number.next cursor = @storage_strategy.fetch_events type_identifier, aggregate_id, first_sequence_number unless last_snapshot_commit or cursor.has_next? raise StreamNotFoundError.new type_identifier, aggregate_id end CursorDomainEventStream.new @storage_strategy, cursor, last_snapshot_commit, aggregate_id end |