Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting
- Defined in:
- lib/sequent/core/event_store.rb
Defined Under Namespace
Classes: DeserializeEventError, NoEventTypesCache, OptimisticLockingError
Constant Summary collapse
- PRINT_PROGRESS =
->(progress, done, _) do if done Sequent.logger.debug "Done replaying #{progress} events" else Sequent.logger.debug "Replayed #{progress} events" end end
Instance Method Summary collapse
- #aggregate_query(aggregate_id) ⇒ Object
-
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
- #events_exists?(aggregate_id) ⇒ Boolean
- #find_event_stream(aggregate_id) ⇒ Object
-
#initialize(cache_event_types: true) ⇒ EventStore
constructor
A new instance of EventStore.
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present.
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
-
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an
EventRecordcursor from the given block. -
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshot events.
- #stream_exists?(aggregate_id) ⇒ Boolean
Constructor Details
#initialize(cache_event_types: true) ⇒ EventStore
Returns a new instance of EventStore.
38 39 40 41 42 43 44 |
# File 'lib/sequent/core/event_store.rb', line 38 def initialize(cache_event_types: true) @event_types = if cache_event_types ThreadSafe::Cache.new else NoEventTypesCache.new end end |
Instance Method Details
#aggregate_query(aggregate_id) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/sequent/core/event_store.rb', line 126 def aggregate_query(aggregate_id) <<~SQL.chomp ( SELECT event_type, event_json FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS o WHERE aggregate_id = #{quote(aggregate_id)} AND sequence_number >= COALESCE((SELECT MAX(sequence_number) FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS i WHERE event_type = #{quote Sequent.configuration.snapshot_event_class.name} AND i.aggregate_id = #{quote(aggregate_id)}), 0) ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote Sequent.configuration.snapshot_event_class.name} THEN 0 ELSE 1 END) ASC ) SQL end |
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/sequent/core/event_store.rb', line 195 def aggregates_that_need_snapshots(last_aggregate_id, limit = 10) stream_table = quote_table_name Sequent.configuration.stream_record_class.table_name event_table = quote_table_name Sequent.configuration.event_record_class.table_name query = <<~SQL.chomp SELECT aggregate_id FROM #{stream_table} stream WHERE aggregate_id::varchar > COALESCE(#{quote last_aggregate_id}, '') AND snapshot_threshold IS NOT NULL AND snapshot_threshold <= ( (SELECT MAX(events.sequence_number) FROM #{event_table} events WHERE events.event_type <> #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) - COALESCE((SELECT MAX(snapshots.sequence_number) FROM #{event_table} snapshots WHERE snapshots.event_type = #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0)) ORDER BY aggregate_id LIMIT #{quote limit} FOR UPDATE SQL Sequent.configuration.event_record_class.connection.select_all(query).map { |x| x['aggregate_id'] } end |
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
The events are published according to the order in the tail of the given streams_with_events array pair.
57 58 59 60 61 62 63 64 |
# File 'lib/sequent/core/event_store.rb', line 57 def commit_events(command, streams_with_events) fail ArgumentError, 'command is required' if command.nil? Sequent.logger.debug("[EventStore] Committing events for command #{command.class}") store_events(command, streams_with_events) publish_events(streams_with_events.flat_map { |_, events| events }) end |
#events_exists?(aggregate_id) ⇒ Boolean
145 146 147 |
# File 'lib/sequent/core/event_store.rb', line 145 def events_exists?(aggregate_id) Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id) end |
#find_event_stream(aggregate_id) ⇒ Object
213 214 215 216 |
# File 'lib/sequent/core/event_store.rb', line 213 def find_event_stream(aggregate_id) record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first record&.event_stream end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present
100 101 102 |
# File 'lib/sequent/core/event_store.rb', line 100 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/sequent/core/event_store.rb', line 104 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? streams = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_ids) query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(' UNION ALL ') events = Sequent.configuration.event_record_class.connection.select_all(query).map do |event_hash| deserialize_event(event_hash) end events .group_by(&:aggregate_id) .map do |aggregate_id, es| [ streams.find do |stream_record| stream_record.aggregate_id == aggregate_id end.event_stream, es, ] end end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
DEPRECATED: use replay_events_from_cursor instead.
153 154 155 156 157 |
# File 'lib/sequent/core/event_store.rb', line 153 def replay_events warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`' events = yield.map { |event_hash| deserialize_event(event_hash) } publish_events(events) end |
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an EventRecord cursor from the given block.
Prefer this replay method if your db adapter supports cursors.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/sequent/core/event_store.rb', line 166 def replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) progress = 0 cursor = get_events.call ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) publish_events([event]) progress += 1 ids_replayed << record['id'] if progress % block_size == 0 on_progress[progress, false, ids_replayed] ids_replayed.clear end end on_progress[progress, true, ids_replayed] end |
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshot events.
This streaming is done in batches to prevent loading many events in memory all at once. A usecase for ignoring the snapshots is when events of a nested AggregateRoot need to be loaded up until a certain moment in time.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/sequent/core/event_store.rb', line 75 def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) stream = find_event_stream(aggregate_id) fail ArgumentError, 'no stream found for this aggregate' if stream.blank? q = Sequent .configuration .event_record_class .where(aggregate_id: aggregate_id) .where.not(event_type: Sequent.configuration.snapshot_event_class.name) .order(:sequence_number) q = q.where('created_at < ?', load_until) if load_until.present? has_events = false q.select('event_type, event_json').each_row do |event_hash| has_events = true event = deserialize_event(event_hash) block.call([stream, event]) end fail ArgumentError, 'no events for this aggregate' unless has_events end |
#stream_exists?(aggregate_id) ⇒ Boolean
141 142 143 |
# File 'lib/sequent/core/event_store.rb', line 141 def stream_exists?(aggregate_id) Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id) end |