Class: Sequent::Core::EventStore

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ActiveRecord::ConnectionAdapters::Quoting
Defined in:
lib/sequent/core/event_store.rb

Defined Under Namespace

Classes: DeserializeEventError, OptimisticLockingError

Constant Summary collapse

->(progress, done, _) do
  if done
    Sequent.logger.debug "Done replaying #{progress} events"
  else
    Sequent.logger.debug "Replayed #{progress} events"
  end
end

Instance Method Summary collapse

Constructor Details

#initializeEventStore

Returns a new instance of EventStore.



29
30
31
# File 'lib/sequent/core/event_store.rb', line 29

def initialize
  @event_types = ThreadSafe::Cache.new
end

Instance Method Details

#aggregate_query(aggregate_id) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/sequent/core/event_store.rb', line 82

def aggregate_query(aggregate_id)
  "    (\n    SELECT event_type, event_json\n      FROM \#{quote_table_name Sequent.configuration.event_record_class.table_name} AS o\n    WHERE aggregate_id = \#{quote(aggregate_id)}\n    AND sequence_number >= COALESCE((SELECT MAX(sequence_number)\n                                     FROM \#{quote_table_name Sequent.configuration.event_record_class.table_name} AS i\n                                     WHERE event_type = \#{quote Sequent.configuration.snapshot_event_class.name}\n                                       AND i.aggregate_id = \#{quote(aggregate_id)}), 0)\n    ORDER BY sequence_number ASC, (CASE event_type WHEN \#{quote Sequent.configuration.snapshot_event_class.name} THEN 0 ELSE 1 END) ASC\n    )\n  SQL\nend\n".chomp

#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object

Returns the ids of aggregates that need a new snapshot.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/sequent/core/event_store.rb', line 151

def aggregates_that_need_snapshots(last_aggregate_id, limit = 10)
  stream_table = quote_table_name Sequent.configuration.stream_record_class.table_name
  event_table = quote_table_name Sequent.configuration.event_record_class.table_name
  query = "    SELECT aggregate_id\n      FROM \#{stream_table} stream\n     WHERE aggregate_id::varchar > COALESCE(\#{quote last_aggregate_id}, '')\n       AND snapshot_threshold IS NOT NULL\n       AND snapshot_threshold <= (\n             (SELECT MAX(events.sequence_number) FROM \#{event_table} events WHERE events.event_type <> \#{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) -\n             COALESCE((SELECT MAX(snapshots.sequence_number) FROM \#{event_table} snapshots WHERE snapshots.event_type = \#{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0))\n     ORDER BY aggregate_id\n     LIMIT \#{quote limit}\n     FOR UPDATE\n  SQL\n  Sequent.configuration.event_record_class.connection.select_all(query).map { |x| x['aggregate_id'] }\nend\n".chomp

#commit_events(command, streams_with_events) ⇒ Object

Stores the events in the EventStore and publishes the events to the registered event_handlers.

The events are published according to the order in the tail of the given ‘streams_with_events` array pair.

Parameters:

  • command

    The command that caused the Events

  • streams_with_events

    is an enumerable of pairs from ‘StreamRecord` to arrays ordered uncommitted `Event`s.



44
45
46
47
48
49
50
51
# File 'lib/sequent/core/event_store.rb', line 44

def commit_events(command, streams_with_events)
  fail ArgumentError, 'command is required' if command.nil?

  Sequent.logger.debug("[EventStore] Committing events for command #{command.class}")

  store_events(command, streams_with_events)
  publish_events(streams_with_events.flat_map { |_, events| events })
end

#events_exists?(aggregate_id) ⇒ Boolean

Returns:



101
102
103
# File 'lib/sequent/core/event_store.rb', line 101

def events_exists?(aggregate_id)
  Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id)
end

#find_event_stream(aggregate_id) ⇒ Object



169
170
171
172
# File 'lib/sequent/core/event_store.rb', line 169

def find_event_stream(aggregate_id)
  record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first
  record&.event_stream
end

#load_events(aggregate_id) ⇒ Object

Returns all events for the aggregate ordered by sequence_number



56
57
58
# File 'lib/sequent/core/event_store.rb', line 56

def load_events(aggregate_id)
  load_events_for_aggregates([aggregate_id])[0]
end

#load_events_for_aggregates(aggregate_ids) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sequent/core/event_store.rb', line 60

def load_events_for_aggregates(aggregate_ids)
  return [] if aggregate_ids.none?

  streams = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_ids)

  query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(' UNION ALL ')
  events = Sequent.configuration.event_record_class.connection.select_all(query).map do |event_hash|
    deserialize_event(event_hash)
  end

  events
    .group_by(&:aggregate_id)
    .map do |aggregate_id, es|
    [
      streams.find do |stream_record|
        stream_record.aggregate_id == aggregate_id
      end.event_stream,
      es,
    ]
  end
end

#replay_eventsObject

Replays all events in the event store to the registered event_handlers.

DEPRECATED: use replay_events_from_cursor instead.

Parameters:

  • block

    that returns the events.



109
110
111
112
113
# File 'lib/sequent/core/event_store.rb', line 109

def replay_events
  warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`'
  events = yield.map { |event_hash| deserialize_event(event_hash) }
  publish_events(events)
end

#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object

Replays all events on an ‘EventRecord` cursor from the given block.

Prefer this replay method if your db adapter supports cursors.

Parameters:

  • get_events

    lambda that returns the events cursor

  • on_progress (defaults to: PRINT_PROGRESS)

    lambda that gets called on substantial progress



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sequent/core/event_store.rb', line 122

def replay_events_from_cursor(get_events:, block_size: 2000,
                              on_progress: PRINT_PROGRESS)
  progress = 0
  cursor = get_events.call
  ids_replayed = []
  cursor.each_row(block_size: block_size).each do |record|
    event = deserialize_event(record)
    publish_events([event])
    progress += 1
    ids_replayed << record['id']
    if progress % block_size == 0
      on_progress[progress, false, ids_replayed]
      ids_replayed.clear
    end
  end
  on_progress[progress, true, ids_replayed]
end

#stream_exists?(aggregate_id) ⇒ Boolean

Returns:



97
98
99
# File 'lib/sequent/core/event_store.rb', line 97

def stream_exists?(aggregate_id)
  Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id)
end