Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting, Helpers::PgsqlHelpers, SnapshotStore
- Defined in:
- lib/sequent/core/event_store.rb
Defined Under Namespace
Classes: DeserializeEventError, OptimisticLockingError
Constant Summary collapse
- PRINT_PROGRESS =
->(progress, done, _) do next unless Sequent.logger.debug? if done Sequent.logger.debug("Done replaying #{progress} events") else Sequent.logger.debug("Replayed #{progress} events") end end
Instance Method Summary collapse
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
- #events_exists?(aggregate_id) ⇒ Boolean
- #find_event_stream(aggregate_id) ⇒ Object
- #load_event(aggregate_id, sequence_number) ⇒ Object
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present.
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
- #load_events_since_marked_position(mark) ⇒ Object
- #permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) ⇒ Object
- #permanently_delete_event_stream(aggregate_id) ⇒ Object
- #permanently_delete_event_streams(aggregate_ids) ⇒ Object
- #position_mark ⇒ Object
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
-
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
-
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
- #stream_exists?(aggregate_id) ⇒ Boolean
Methods included from SnapshotStore
#aggregates_that_need_snapshots, #clear_aggregate_for_snapshotting, #clear_aggregates_for_snapshotting_with_last_event_before, #delete_all_snapshots, #delete_snapshots_before, #load_latest_snapshot, #mark_aggregate_for_snapshotting, #select_aggregates_for_snapshotting, #store_snapshots
Methods included from Helpers::PgsqlHelpers
#call_procedure, #query_function
Instance Method Details
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
The events are published according to the order in the tail of the given ‘streams_with_events` array pair.
48 49 50 51 52 53 54 55 |
# File 'lib/sequent/core/event_store.rb', line 48 def commit_events(command, streams_with_events) fail ArgumentError, 'command is required' if command.nil? Sequent.logger.debug("[EventStore] Committing events for command #{command.class}") store_events(command, streams_with_events) publish_events(streams_with_events.flat_map { |_, events| events }) end |
#events_exists?(aggregate_id) ⇒ Boolean
126 127 128 |
# File 'lib/sequent/core/event_store.rb', line 126 def events_exists?(aggregate_id) Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id) end |
#find_event_stream(aggregate_id) ⇒ Object
176 177 178 179 |
# File 'lib/sequent/core/event_store.rb', line 176 def find_event_stream(aggregate_id) record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first record&.event_stream end |
#load_event(aggregate_id, sequence_number) ⇒ Object
91 92 93 94 |
# File 'lib/sequent/core/event_store.rb', line 91 def load_event(aggregate_id, sequence_number) event_hash = query_function(connection, 'load_event', [aggregate_id, sequence_number]).first deserialize_event(event_hash) if event_hash end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present
100 101 102 |
# File 'lib/sequent/core/event_store.rb', line 100 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/sequent/core/event_store.rb', line 104 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? query_events(aggregate_ids) .group_by { |row| row['aggregate_id'] } .values .map do |rows| [ EventStream.new( aggregate_type: rows.first['aggregate_type'], aggregate_id: rows.first['aggregate_id'], events_partition_key: rows.first['events_partition_key'], ), rows.map { |row| deserialize_event(row) }, ] end end |
#load_events_since_marked_position(mark) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/sequent/core/event_store.rb', line 185 def load_events_since_marked_position(mark) events = connection.execute( Sequent.configuration.event_record_class .where(<<~SQL, {mark:}) xact_id >= pg_snapshot_xmin(CAST(:mark AS pg_snapshot))::text::bigint AND NOT pg_visible_in_snapshot(xact_id::text::xid8, CAST(:mark AS pg_snapshot)) SQL .select('*, pg_current_snapshot()::text AS mark') .to_sql, ).to_a return [[], mark] if events.empty? [ events.map { |hash| deserialize_event(hash) }, events[0]['mark'], ] end |
#permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) ⇒ Object
212 213 214 215 216 217 218 |
# File 'lib/sequent/core/event_store.rb', line 212 def permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) unless aggregate_id || organization_id fail ArgumentError, 'aggregate_id and/or organization_id must be specified' end call_procedure(connection, 'permanently_delete_commands_without_events', [aggregate_id, organization_id]) end |
#permanently_delete_event_stream(aggregate_id) ⇒ Object
204 205 206 |
# File 'lib/sequent/core/event_store.rb', line 204 def permanently_delete_event_stream(aggregate_id) permanently_delete_event_streams([aggregate_id]) end |
#permanently_delete_event_streams(aggregate_ids) ⇒ Object
208 209 210 |
# File 'lib/sequent/core/event_store.rb', line 208 def permanently_delete_event_streams(aggregate_ids) call_procedure(connection, 'permanently_delete_event_streams', [aggregate_ids.to_json]) end |
#position_mark ⇒ Object
181 182 183 |
# File 'lib/sequent/core/event_store.rb', line 181 def position_mark connection.exec_query('SELECT pg_current_snapshot()::text AS mark')[0]['mark'] end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
DEPRECATED: use replay_events_from_cursor
instead.
135 136 137 138 139 |
# File 'lib/sequent/core/event_store.rb', line 135 def replay_events warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`' events = yield.map { |event_hash| deserialize_event(event_hash) } publish_events(events) end |
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
Prefer this replay method if your db adapter supports cursors.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/sequent/core/event_store.rb', line 148 def replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) progress = 0 cursor = get_events.call ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) publish_events([event]) progress += 1 ids_replayed << record['aggregate_id'] if progress % block_size == 0 on_progress[progress, false, ids_replayed] ids_replayed.clear end end on_progress[progress, true, ids_replayed] end |
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
This streaming is done in batches to prevent loading many events in memory all at once. A usecase for ignoring the snapshots is when events of a nested AggregateRoot need to be loaded up until a certain moment in time.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/sequent/core/event_store.rb', line 66 def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) stream = find_event_stream(aggregate_id) fail ArgumentError, 'no stream found for this aggregate' if stream.blank? has_events = false # PostgreSQLCursor::Cursor does not support bind parameters, so bind parameters manually instead. sql = ActiveRecord::Base.sanitize_sql_array( [ 'SELECT * FROM load_events(:aggregate_ids, FALSE, :load_until)', { aggregate_ids: [aggregate_id].to_json, load_until: load_until, }, ], ) PostgreSQLCursor::Cursor.new(sql, {connection: connection}).each_row do |event_hash| has_events = true event = deserialize_event(event_hash) block.call([stream, event]) end fail ArgumentError, 'no events for this aggregate' unless has_events end |
#stream_exists?(aggregate_id) ⇒ Boolean
122 123 124 |
# File 'lib/sequent/core/event_store.rb', line 122 def stream_exists?(aggregate_id) Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id) end |