Class: EventSourcery::Memory::EventStore
- Inherits:
-
Object
- Object
- EventSourcery::Memory::EventStore
- Includes:
- EventStore::EachByRange
- Defined in:
- lib/event_sourcery/memory/event_store.rb
Overview
In-memory event store.
Note: This is not persisted and is generally used for testing.
Instance Method Summary collapse
-
#add_listeners(listeners) ⇒ Object
Adds a listener or listeners to the memory store.
-
#ensure_one_aggregate(events) ⇒ Object
Ensure all events have the same aggregate.
-
#get_events_for_aggregate_id(id) ⇒ Object
Get all events for the given aggregate.
-
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Object
Retrieve a subset of events.
-
#initialize(events = [], event_builder: EventSourcery.config.event_builder) ⇒ EventStore
constructor
A new instance of EventStore.
-
#latest_event_id(event_types: nil) ⇒ Object
Retrieve the latest event ID.
-
#next_version(aggregate_id) ⇒ Object
Next version for the aggregate.
-
#sink(event_or_events, expected_version: nil) ⇒ Object
Store given events to the in-memory store.
-
#version_for(aggregate_id) ⇒ Object
Current version for the aggregate.
Methods included from EventStore::EachByRange
Constructor Details
#initialize(events = [], event_builder: EventSourcery.config.event_builder) ⇒ EventStore
Returns a new instance of EventStore.
12 13 14 15 16 |
# File 'lib/event_sourcery/memory/event_store.rb', line 12 def initialize(events = [], event_builder: EventSourcery.config.event_builder) @events = events @event_builder = event_builder @listeners = [] end |
Instance Method Details
#add_listeners(listeners) ⇒ Object
Adds a listener or listeners to the memory store. the #process(event) method will execute whenever an event is emitted
120 121 122 |
# File 'lib/event_sourcery/memory/event_store.rb', line 120 def add_listeners(listeners) @listeners.concat(Array(listeners)) end |
#ensure_one_aggregate(events) ⇒ Object
Ensure all events have the same aggregate
110 111 112 113 114 |
# File 'lib/event_sourcery/memory/event_store.rb', line 110 def ensure_one_aggregate(events) unless events.map(&:aggregate_id).uniq.one? raise AtomicWriteToMultipleAggregatesNotSupported end end |
#get_events_for_aggregate_id(id) ⇒ Object
Get all events for the given aggregate
85 86 87 88 |
# File 'lib/event_sourcery/memory/event_store.rb', line 85 def get_events_for_aggregate_id(id) stringified_id = id.to_str @events.select { |event| event.aggregate_id == stringified_id } end |
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Object
Retrieve a subset of events
57 58 59 60 61 62 63 64 65 |
# File 'lib/event_sourcery/memory/event_store.rb', line 57 def get_next_from(id, event_types: nil, limit: 1000) events = if event_types.nil? @events else @events.select { |e| event_types.include?(e.type) } end events.select { |event| event.id >= id }.first(limit) end |
#latest_event_id(event_types: nil) ⇒ Object
Retrieve the latest event ID
71 72 73 74 75 76 77 78 79 |
# File 'lib/event_sourcery/memory/event_store.rb', line 71 def latest_event_id(event_types: nil) events = if event_types.nil? @events else @events.select { |e| event_types.include?(e.type) } end events.empty? ? 0 : events.last.id end |
#next_version(aggregate_id) ⇒ Object
Next version for the aggregate
94 95 96 |
# File 'lib/event_sourcery/memory/event_store.rb', line 94 def next_version(aggregate_id) version_for(aggregate_id) + 1 end |
#sink(event_or_events, expected_version: nil) ⇒ Object
Store given events to the in-memory store
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/event_sourcery/memory/event_store.rb', line 24 def sink(event_or_events, expected_version: nil) events = Array(event_or_events) ensure_one_aggregate(events) if expected_version && version_for(events.first.aggregate_id) != expected_version raise ConcurrencyError end events.each do |event| @events << @event_builder.build( id: @events.size + 1, aggregate_id: event.aggregate_id, type: event.type, version: next_version(event.aggregate_id), body: EventBodySerializer.serialize(event.body), created_at: event.created_at || Time.now.utc, uuid: event.uuid, correlation_id: event.correlation_id, causation_id: event.causation_id, ) end project_events(events) true end |
#version_for(aggregate_id) ⇒ Object
Current version for the aggregate
102 103 104 |
# File 'lib/event_sourcery/memory/event_store.rb', line 102 def version_for(aggregate_id) get_events_for_aggregate_id(aggregate_id).count end |