Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting
- Defined in:
- lib/sequent/core/event_store.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#configuration ⇒ Object
Returns the value of attribute configuration.
Instance Method Summary collapse
-
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
- #find_event_stream(aggregate_id) ⇒ Object
-
#initialize(configuration = Sequent.configuration) ⇒ EventStore
constructor
A new instance of EventStore.
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number.
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
- #stream_exists?(aggregate_id) ⇒ Boolean
Constructor Details
#initialize(configuration = Sequent.configuration) ⇒ EventStore
Returns a new instance of EventStore.
15 16 17 18 |
# File 'lib/sequent/core/event_store.rb', line 15 def initialize(configuration = Sequent.configuration) self.configuration = configuration @event_types = ThreadSafe::Cache.new end |
Instance Attribute Details
#configuration ⇒ Object
Returns the value of attribute configuration.
12 13 14 |
# File 'lib/sequent/core/event_store.rb', line 12 def configuration @configuration end |
Instance Method Details
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/sequent/core/event_store.rb', line 69 def aggregates_that_need_snapshots(last_aggregate_id, limit = 10) stream_table = quote_table_name stream_record_class.table_name event_table = quote_table_name event_record_class.table_name query = %Q{ SELECT aggregate_id FROM #{stream_table} stream WHERE aggregate_id > COALESCE(#{quote last_aggregate_id}, '') AND snapshot_threshold IS NOT NULL AND snapshot_threshold <= ( (SELECT MAX(events.sequence_number) FROM #{event_table} events WHERE events.event_type <> #{quote snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) - COALESCE((SELECT MAX(snapshots.sequence_number) FROM #{event_table} snapshots WHERE snapshots.event_type = #{quote snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0)) ORDER BY aggregate_id LIMIT #{quote limit} FOR UPDATE } event_record_class.connection.select_all(query).map {|x| x['aggregate_id']} end |
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
Streams_with_Events is an enumerable of pairs from ‘StreamRecord` to arrays of uncommitted `Event`s.
27 28 29 30 |
# File 'lib/sequent/core/event_store.rb', line 27 def commit_events(command, streams_with_events) store_events(command, streams_with_events) publish_events(streams_with_events.flat_map {|_, events| events}, event_handlers) end |
#find_event_stream(aggregate_id) ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/sequent/core/event_store.rb', line 87 def find_event_stream(aggregate_id) record = stream_record_class.where(aggregate_id: aggregate_id).first if record record.event_stream else nil end end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/sequent/core/event_store.rb', line 35 def load_events(aggregate_id) stream = stream_record_class.where(aggregate_id: aggregate_id).first return nil unless stream events = event_record_class.connection.select_all(%Q{ SELECT event_type, event_json FROM #{quote_table_name event_record_class.table_name} WHERE aggregate_id = #{quote aggregate_id} AND sequence_number >= COALESCE((SELECT MAX(sequence_number) FROM #{quote_table_name event_record_class.table_name} WHERE event_type = #{quote snapshot_event_class.name} AND aggregate_id = #{quote aggregate_id}), 0) ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC }).map! do |event_hash| deserialize_event(event_hash) end [stream.event_stream, events] end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
61 62 63 64 |
# File 'lib/sequent/core/event_store.rb', line 61 def replay_events events = yield.map {|event_hash| deserialize_event(event_hash)} publish_events(events, event_handlers) end |
#stream_exists?(aggregate_id) ⇒ Boolean
53 54 55 |
# File 'lib/sequent/core/event_store.rb', line 53 def stream_exists?(aggregate_id) stream_record_class.exists?(aggregate_id: aggregate_id) end |