Class: Sequent::Core::EventStore

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ActiveRecord::ConnectionAdapters::Quoting
Defined in:
lib/sequent/core/event_store.rb

Defined Under Namespace

Classes: DeserializeEventError, OptimisticLockingError

Constant Summary collapse

lambda do |progress, done, _|
  if done
    Sequent.logger.debug "Done replaying #{progress} events"
  else
    Sequent.logger.debug "Replayed #{progress} events"
  end
end

Instance Method Summary collapse

Constructor Details

#initializeEventStore

Returns a new instance of EventStore.



28
29
30
# File 'lib/sequent/core/event_store.rb', line 28

def initialize
  @event_types = ThreadSafe::Cache.new
end

Instance Method Details

#aggregate_query(aggregate_id) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/sequent/core/event_store.rb', line 74

def aggregate_query(aggregate_id)
  %Q{(
SELECT event_type, event_json
  FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS o
WHERE aggregate_id = #{quote(aggregate_id)}
AND sequence_number >= COALESCE((SELECT MAX(sequence_number)
                           FROM #{quote_table_name Sequent.configuration.event_record_class.table_name} AS i
                           WHERE event_type = #{quote Sequent.configuration.snapshot_event_class.name}
                             AND i.aggregate_id = #{quote(aggregate_id)}), 0)
ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote Sequent.configuration.snapshot_event_class.name} THEN 0 ELSE 1 END) ASC
)}
end

#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object

Returns the ids of aggregates that need a new snapshot.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/sequent/core/event_store.rb', line 142

def aggregates_that_need_snapshots(last_aggregate_id, limit = 10)
  stream_table = quote_table_name Sequent.configuration.stream_record_class.table_name
  event_table = quote_table_name Sequent.configuration.event_record_class.table_name
  query = %Q{
SELECT aggregate_id
  FROM #{stream_table} stream
 WHERE aggregate_id::varchar > COALESCE(#{quote last_aggregate_id}, '')
   AND snapshot_threshold IS NOT NULL
   AND snapshot_threshold <= (
   (SELECT MAX(events.sequence_number) FROM #{event_table} events WHERE events.event_type <> #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) -
   COALESCE((SELECT MAX(snapshots.sequence_number) FROM #{event_table} snapshots WHERE snapshots.event_type = #{quote Sequent.configuration.snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0))
 ORDER BY aggregate_id
 LIMIT #{quote limit}
 FOR UPDATE
}
  Sequent.configuration.event_record_class.connection.select_all(query).map { |x| x['aggregate_id'] }
end

#commit_events(command, streams_with_events) ⇒ Object

Stores the events in the EventStore and publishes the events to the registered event_handlers.

The events are published according to the order in the tail of the given ‘streams_with_events` array pair.

Parameters:

  • command

    The command that caused the Events

  • streams_with_events

    is an enumerable of pairs from ‘StreamRecord` to arrays ordered uncommitted `Event`s.



43
44
45
46
47
48
49
50
# File 'lib/sequent/core/event_store.rb', line 43

def commit_events(command, streams_with_events)
  fail ArgumentError, "command is required" if command.nil?

  Sequent.logger.debug("[EventStore] Committing events for command #{command.class}")

  store_events(command, streams_with_events)
  publish_events(streams_with_events.flat_map { |_, events| events })
end

#events_exists?(aggregate_id) ⇒ Boolean

Returns:



91
92
93
# File 'lib/sequent/core/event_store.rb', line 91

def events_exists?(aggregate_id)
  Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id)
end

#find_event_stream(aggregate_id) ⇒ Object



160
161
162
163
164
165
166
167
# File 'lib/sequent/core/event_store.rb', line 160

def find_event_stream(aggregate_id)
  record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first
  if record
    record.event_stream
  else
    nil
  end
end

#load_events(aggregate_id) ⇒ Object

Returns all events for the aggregate ordered by sequence_number



55
56
57
# File 'lib/sequent/core/event_store.rb', line 55

def load_events(aggregate_id)
  load_events_for_aggregates([aggregate_id])[0]
end

#load_events_for_aggregates(aggregate_ids) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/sequent/core/event_store.rb', line 59

def load_events_for_aggregates(aggregate_ids)
  return [] if aggregate_ids.none?

  streams = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_ids)

  query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(" UNION ALL ")
  events = Sequent.configuration.event_record_class.connection.select_all(query).map! do |event_hash|
    deserialize_event(event_hash)
  end

  events
    .group_by { |event| event.aggregate_id }
    .map { |aggregate_id, _events| [streams.find { |stream_record| stream_record.aggregate_id == aggregate_id }.event_stream, _events] }
end

#replay_eventsObject

Replays all events in the event store to the registered event_handlers.

DEPRECATED: use replay_events_from_cursor instead.

Parameters:

  • block

    that returns the events.



99
100
101
102
103
# File 'lib/sequent/core/event_store.rb', line 99

def replay_events
  warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`"
  events = yield.map { |event_hash| deserialize_event(event_hash) }
  publish_events(events)
end

#replay_events_from_cursor(block_size: 2000, get_events:, on_progress: PRINT_PROGRESS) ⇒ Object

Replays all events on an ‘EventRecord` cursor from the given block.

Prefer this replay method if your db adapter supports cursors.

Parameters:

  • get_events

    lambda that returns the events cursor

  • on_progress (defaults to: PRINT_PROGRESS)

    lambda that gets called on substantial progress



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/sequent/core/event_store.rb', line 112

def replay_events_from_cursor(block_size: 2000,
                              get_events:,
                              on_progress: PRINT_PROGRESS)
  progress = 0
  cursor = get_events.call
  ids_replayed = []
  cursor.each_row(block_size: block_size).each do |record|
    event = deserialize_event(record)
    publish_events([event])
    progress += 1
    ids_replayed << record['id']
    if progress % block_size == 0
      on_progress[progress, false, ids_replayed]
      ids_replayed.clear
    end
  end
  on_progress[progress, true, ids_replayed]
end

#stream_exists?(aggregate_id) ⇒ Boolean

Returns:



87
88
89
# File 'lib/sequent/core/event_store.rb', line 87

def stream_exists?(aggregate_id)
  Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id)
end