Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting
- Defined in:
- lib/sequent/core/event_store.rb
Defined Under Namespace
Classes: DeserializeEventError, OptimisticLockingError
Constant Summary collapse
- PRINT_PROGRESS =
lambda do |progress, done, _| if done Sequent.logger.debug "Done replaying #{progress} events" else Sequent.logger.debug "Replayed #{progress} events" end end
Instance Method Summary collapse
- #aggregate_query(aggregate_id) ⇒ Object
-
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
- #find_event_stream(aggregate_id) ⇒ Object
-
#initialize ⇒ EventStore
constructor
A new instance of EventStore.
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number.
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
-
#replay_events_from_cursor(block_size: 2000, get_events:, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
- #stream_exists?(aggregate_id) ⇒ Boolean
Constructor Details
#initialize ⇒ EventStore
Returns a new instance of EventStore.
28 29 30 |
# File 'lib/sequent/core/event_store.rb', line 28 def initialize @event_types = ThreadSafe::Cache.new end |
Instance Method Details
#aggregate_query(aggregate_id) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/sequent/core/event_store.rb', line 66 def aggregate_query(aggregate_id) %Q{( SELECT event_type, event_json FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS o WHERE aggregate_id = #{quote(aggregate_id)} AND sequence_number >= COALESCE((SELECT MAX(sequence_number) FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS i WHERE event_type = #{quote Sequent.configuration.snapshot_event_class.name} AND i.aggregate_id = #{quote(aggregate_id)}), 0) ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote Sequent.configuration.snapshot_event_class.name} THEN 0 ELSE 1 END) ASC )} end |
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/sequent/core/event_store.rb', line 131 def aggregates_that_need_snapshots(last_aggregate_id, limit = 10) stream_table = quote_table_name Sequent.configuration.stream_record_class.table_name event_table = quote_table_name Sequent.configuration.event_record_class.table_name query = %Q{ SELECT aggregate_id FROM #{stream_table} stream WHERE aggregate_id::varchar > COALESCE(#{quote last_aggregate_id}, '') AND snapshot_threshold IS NOT NULL AND snapshot_threshold <= ( (SELECT MAX(events.sequence_number) FROM #{event_table} events WHERE events.event_type <> #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) - COALESCE((SELECT MAX(snapshots.sequence_number) FROM #{event_table} snapshots WHERE snapshots.event_type = #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0)) ORDER BY aggregate_id LIMIT #{quote limit} FOR UPDATE } Sequent.configuration.event_record_class.connection.select_all(query).map { |x| x['aggregate_id'] } end |
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
Streams_with_Events is an enumerable of pairs from ‘StreamRecord` to arrays of uncommitted `Event`s.
39 40 41 42 |
# File 'lib/sequent/core/event_store.rb', line 39 def commit_events(command, streams_with_events) store_events(command, streams_with_events) publish_events(streams_with_events.flat_map { |_, events| events }) end |
#find_event_stream(aggregate_id) ⇒ Object
149 150 151 152 153 154 155 156 |
# File 'lib/sequent/core/event_store.rb', line 149 def find_event_stream(aggregate_id) record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first if record record.event_stream else nil end end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number
47 48 49 |
# File 'lib/sequent/core/event_store.rb', line 47 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/sequent/core/event_store.rb', line 51 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? streams = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_ids) query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(" UNION ALL ") events = Sequent.configuration.event_record_class.connection.select_all(query).map! do |event_hash| deserialize_event(event_hash) end events .group_by { |event| event.aggregate_id } .map { |aggregate_id, _events| [streams.find { |stream_record| stream_record.aggregate_id == aggregate_id }.event_stream, _events] } end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
DEPRECATED: use replay_events_from_cursor
instead.
88 89 90 91 92 |
# File 'lib/sequent/core/event_store.rb', line 88 def replay_events warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`" events = yield.map { |event_hash| deserialize_event(event_hash) } publish_events(events) end |
#replay_events_from_cursor(block_size: 2000, get_events:, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
Prefer this replay method if your db adapter supports cursors.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/sequent/core/event_store.rb', line 101 def replay_events_from_cursor(block_size: 2000, get_events:, on_progress: PRINT_PROGRESS) progress = 0 cursor = get_events.call ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) publish_events([event]) progress += 1 ids_replayed << record['id'] if progress % block_size == 0 on_progress[progress, false, ids_replayed] ids_replayed.clear end end on_progress[progress, true, ids_replayed] end |
#stream_exists?(aggregate_id) ⇒ Boolean
79 80 81 |
# File 'lib/sequent/core/event_store.rb', line 79 def stream_exists?(aggregate_id) Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id) end |