Class: Sequent::Core::EventStore

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ActiveRecord::ConnectionAdapters::Quoting
Defined in:
lib/sequent/core/event_store.rb

Direct Known Subclasses

TenantEventStore

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = Sequent.configuration) ⇒ EventStore

Returns a new instance of EventStore.



15
16
17
18
# File 'lib/sequent/core/event_store.rb', line 15

def initialize(configuration = Sequent.configuration)
  self.configuration = configuration
  @event_types = ThreadSafe::Cache.new
end

Instance Attribute Details

#configurationObject

Returns the value of attribute configuration.



12
13
14
# File 'lib/sequent/core/event_store.rb', line 12

def configuration
  @configuration
end

Instance Method Details

#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object

Returns the ids of aggregates that need a new snapshot.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/sequent/core/event_store.rb', line 69

def aggregates_that_need_snapshots(last_aggregate_id, limit = 10)
  stream_table = quote_table_name stream_record_class.table_name
  event_table = quote_table_name event_record_class.table_name
  query = %Q{
SELECT aggregate_id
  FROM #{stream_table} stream
 WHERE aggregate_id > COALESCE(#{quote last_aggregate_id}, '')
   AND snapshot_threshold IS NOT NULL
   AND snapshot_threshold <= (
   (SELECT MAX(events.sequence_number) FROM #{event_table} events WHERE events.event_type <> #{quote snapshot_event_class.name} AND stream.aggregate_id = events.aggregate_id) -
   COALESCE((SELECT MAX(snapshots.sequence_number) FROM #{event_table} snapshots WHERE snapshots.event_type = #{quote snapshot_event_class.name} AND stream.aggregate_id = snapshots.aggregate_id), 0))
 ORDER BY aggregate_id
 LIMIT #{quote limit}
 FOR UPDATE
}
  event_record_class.connection.select_all(query).map {|x| x['aggregate_id']}
end

#commit_events(command, streams_with_events) ⇒ Object

Stores the events in the EventStore and publishes the events to the registered event_handlers.

Streams_with_Events is an enumerable of pairs from ‘StreamRecord` to arrays of uncommitted `Event`s.



27
28
29
30
# File 'lib/sequent/core/event_store.rb', line 27

def commit_events(command, streams_with_events)
  store_events(command, streams_with_events)
  publish_events(streams_with_events.flat_map {|_, events| events}, event_handlers)
end

#find_event_stream(aggregate_id) ⇒ Object



87
88
89
90
91
92
93
94
# File 'lib/sequent/core/event_store.rb', line 87

def find_event_stream(aggregate_id)
  record = stream_record_class.where(aggregate_id: aggregate_id).first
  if record
    record.event_stream
  else
    nil
  end
end

#load_events(aggregate_id) ⇒ Object

Returns all events for the aggregate ordered by sequence_number



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/sequent/core/event_store.rb', line 35

def load_events(aggregate_id)
  stream = stream_record_class.where(aggregate_id: aggregate_id).first
  return nil unless stream
  events = event_record_class.connection.select_all(%Q{
SELECT event_type, event_json
  FROM #{quote_table_name event_record_class.table_name}
 WHERE aggregate_id = #{quote aggregate_id}
   AND sequence_number >= COALESCE((SELECT MAX(sequence_number)
                                FROM #{quote_table_name event_record_class.table_name}
                               WHERE event_type = #{quote snapshot_event_class.name}
                                 AND aggregate_id = #{quote aggregate_id}), 0)
 ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC
}).map! do |event_hash|
    deserialize_event(event_hash)
  end
  [stream.event_stream, events]
end

#replay_eventsObject

Replays all events in the event store to the registered event_handlers.

Parameters:

  • block

    that returns the events.



61
62
63
64
# File 'lib/sequent/core/event_store.rb', line 61

def replay_events
  events = yield.map {|event_hash| deserialize_event(event_hash)}
  publish_events(events, event_handlers)
end

#stream_exists?(aggregate_id) ⇒ Boolean

Returns:



53
54
55
# File 'lib/sequent/core/event_store.rb', line 53

def stream_exists?(aggregate_id)
  stream_record_class.exists?(aggregate_id: aggregate_id)
end