Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting, Helpers::PgsqlHelpers, SnapshotStore
- Defined in:
- lib/sequent/core/event_store.rb
Defined Under Namespace
Classes: DeserializeEventError, OptimisticLockingError
Constant Summary collapse
- PRINT_PROGRESS =
->(progress, done, _) do next unless Sequent.logger.debug? if done Sequent.logger.debug("Done replaying #{progress} events") else Sequent.logger.debug("Replayed #{progress} events") end end
Instance Method Summary collapse
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
-
#event_streams_enumerator(aggregate_type: nil, group_size: 100) ⇒ Object
Returns an enumerator that yields aggregate ids in blocks of ‘group_size` arrays.
- #events_exists?(aggregate_id) ⇒ Boolean
- #find_event_stream(aggregate_id) ⇒ Object
- #load_event(aggregate_id, sequence_number) ⇒ Object
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present.
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
- #load_events_since_marked_position(mark) ⇒ Object
- #permanently_delete_commands_without_events(aggregate_id:) ⇒ Object
- #permanently_delete_event_stream(aggregate_id) ⇒ Object
- #permanently_delete_event_streams(aggregate_ids) ⇒ Object
- #position_mark ⇒ Object
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
-
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
-
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
- #stream_exists?(aggregate_id) ⇒ Boolean
- #update_unique_keys(event_streams) ⇒ Object
Methods included from SnapshotStore
#aggregates_that_need_snapshots, #clear_aggregate_for_snapshotting, #clear_aggregates_for_snapshotting_with_last_event_before, #delete_all_snapshots, #delete_snapshots_before, #load_latest_snapshot, #mark_aggregate_for_snapshotting, #select_aggregates_for_snapshotting, #store_snapshots
Methods included from Helpers::PgsqlHelpers
#call_procedure, #query_function
Instance Method Details
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
The events are published according to the order in the tail of the given ‘streams_with_events` array pair.
67 68 69 70 71 72 73 74 |
# File 'lib/sequent/core/event_store.rb', line 67 def commit_events(command, streams_with_events) fail ArgumentError, 'command is required' if command.nil? Sequent.logger.debug("[EventStore] Committing events for command #{command.class}") store_events(command, streams_with_events) publish_events(streams_with_events.flat_map { |_, events| events }) end |
#event_streams_enumerator(aggregate_type: nil, group_size: 100) ⇒ Object
Returns an enumerator that yields aggregate ids in blocks of ‘group_size` arrays. Optionally the aggregate root type can be specified (as a string) to only yield aggregate ids of the indicated type.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/sequent/core/event_store.rb', line 225 def event_streams_enumerator(aggregate_type: nil, group_size: 100) Enumerator.new do |yielder| last_aggregate_id = nil loop do aggregate_rows = ActiveRecord::Base.connection.exec_query( 'SELECT aggregate_id FROM aggregates WHERE ($1::text IS NULL OR aggregate_type_id = (SELECT id FROM aggregate_types WHERE type = $1)) AND ($3::uuid IS NULL OR aggregate_id > $3) ORDER BY 1 LIMIT $2', 'aggregates_to_update', [ aggregate_type, group_size, last_aggregate_id, ], ).to_a break if aggregate_rows.empty? last_aggregate_id = aggregate_rows.last['aggregate_id'] yielder << aggregate_rows.map { |x| x['aggregate_id'] } end end end |
#events_exists?(aggregate_id) ⇒ Boolean
145 146 147 |
# File 'lib/sequent/core/event_store.rb', line 145 def events_exists?(aggregate_id) Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id) end |
#find_event_stream(aggregate_id) ⇒ Object
195 196 197 198 |
# File 'lib/sequent/core/event_store.rb', line 195 def find_event_stream(aggregate_id) record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first record&.event_stream end |
#load_event(aggregate_id, sequence_number) ⇒ Object
110 111 112 113 |
# File 'lib/sequent/core/event_store.rb', line 110 def load_event(aggregate_id, sequence_number) event_hash = query_function(connection, 'load_event', [aggregate_id, sequence_number]).first deserialize_event(event_hash) if event_hash end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present
119 120 121 |
# File 'lib/sequent/core/event_store.rb', line 119 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/sequent/core/event_store.rb', line 123 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? query_events(aggregate_ids) .group_by { |row| row['aggregate_id'] } .values .map do |rows| [ EventStream.new( aggregate_type: rows.first['aggregate_type'], aggregate_id: rows.first['aggregate_id'], events_partition_key: rows.first['events_partition_key'], ), rows.map { |row| deserialize_event(row) }, ] end end |
#load_events_since_marked_position(mark) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/sequent/core/event_store.rb', line 204 def load_events_since_marked_position(mark) events = connection.execute( Sequent.configuration.event_record_class .where(<<~SQL, {mark:}) xact_id >= pg_snapshot_xmin(CAST(:mark AS pg_snapshot))::text::bigint AND NOT pg_visible_in_snapshot(xact_id::text::xid8, CAST(:mark AS pg_snapshot)) SQL .select('*, pg_current_snapshot()::text AS mark') .to_sql, ).to_a return [[], mark] if events.empty? [ events.map { |hash| deserialize_event(hash) }, events[0]['mark'], ] end |
#permanently_delete_commands_without_events(aggregate_id:) ⇒ Object
272 273 274 275 276 |
# File 'lib/sequent/core/event_store.rb', line 272 def permanently_delete_commands_without_events(aggregate_id:) fail ArgumentError, 'aggregate_id must be specified' unless aggregate_id call_procedure(connection, 'permanently_delete_commands_without_events', [aggregate_id]) end |
#permanently_delete_event_stream(aggregate_id) ⇒ Object
264 265 266 |
# File 'lib/sequent/core/event_store.rb', line 264 def permanently_delete_event_stream(aggregate_id) permanently_delete_event_streams([aggregate_id]) end |
#permanently_delete_event_streams(aggregate_ids) ⇒ Object
268 269 270 |
# File 'lib/sequent/core/event_store.rb', line 268 def permanently_delete_event_streams(aggregate_ids) call_procedure(connection, 'permanently_delete_event_streams', [aggregate_ids.to_json]) end |
#position_mark ⇒ Object
200 201 202 |
# File 'lib/sequent/core/event_store.rb', line 200 def position_mark connection.exec_query('SELECT pg_current_snapshot()::text AS mark')[0]['mark'] end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
DEPRECATED: use replay_events_from_cursor instead.
154 155 156 157 158 |
# File 'lib/sequent/core/event_store.rb', line 154 def replay_events warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`' events = yield.map { |event_hash| deserialize_event(event_hash) } publish_events(events) end |
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
Prefer this replay method if your db adapter supports cursors.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/sequent/core/event_store.rb', line 167 def replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) progress = 0 cursor = get_events.call ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) publish_events([event]) progress += 1 ids_replayed << record['aggregate_id'] if progress % block_size == 0 on_progress[progress, false, ids_replayed] ids_replayed.clear end end on_progress[progress, true, ids_replayed] end |
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
This streaming is done in batches to prevent loading many events in memory all at once. A usecase for ignoring the snapshots is when events of a nested AggregateRoot need to be loaded up until a certain moment in time.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/sequent/core/event_store.rb', line 85 def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) stream = find_event_stream(aggregate_id) fail ArgumentError, 'no stream found for this aggregate' if stream.blank? has_events = false # PostgreSQLCursor::Cursor does not support bind parameters, so bind parameters manually instead. sql = ActiveRecord::Base.sanitize_sql_array( [ 'SELECT * FROM load_events(:aggregate_ids, FALSE, :load_until)', { aggregate_ids: [aggregate_id].to_json, load_until: load_until, }, ], ) PostgreSQLCursor::Cursor.new(sql, {connection: connection}).each_row do |event_hash| has_events = true event = deserialize_event(event_hash) block.call([stream, event]) end fail ArgumentError, 'no events for this aggregate' unless has_events end |
#stream_exists?(aggregate_id) ⇒ Boolean
141 142 143 |
# File 'lib/sequent/core/event_store.rb', line 141 def stream_exists?(aggregate_id) Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id) end |
#update_unique_keys(event_streams) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/sequent/core/event_store.rb', line 252 def update_unique_keys(event_streams) fail ArgumentError, 'array of stream records expected' unless event_streams.all? { |x| x.is_a?(EventStream) } call_procedure(connection, 'update_unique_keys', [event_streams.to_json]) rescue ActiveRecord::RecordNotUnique => e if AggregateKeyNotUniqueError.(e.) raise AggregateKeyNotUniqueError, e. else raise OptimisticLockingError end end |