Class: Sequent::Test::CommandHandlerHelpers::FakeEventStore
- Inherits:
-
Object
- Object
- Sequent::Test::CommandHandlerHelpers::FakeEventStore
- Defined in:
- lib/sequent/test/command_handler_helpers.rb
Instance Method Summary collapse
- #commit_events(_, streams_with_events) ⇒ Object
- #event_streams_enumerator(aggregate_type: nil, group_size: 100) ⇒ Object
- #events_exists?(aggregate_id) ⇒ Boolean
- #find_event_stream(aggregate_id) ⇒ Object
-
#initialize ⇒ FakeEventStore
constructor
A new instance of FakeEventStore.
- #load_events(aggregate_id) ⇒ Object
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
- #load_events_since_marked_position(mark) ⇒ Object
- #position_mark ⇒ Object
- #publish_events(events) ⇒ Object
- #stream_exists?(aggregate_id) ⇒ Boolean
Constructor Details
#initialize ⇒ FakeEventStore
Returns a new instance of FakeEventStore.
58 59 60 61 62 63 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 58 def initialize @event_streams = {} @all_events = {} @stored_events = [] @unique_keys = {} end |
Instance Method Details
#commit_events(_, streams_with_events) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 83 def commit_events(_, streams_with_events) keys = @unique_keys.dup.delete_if do |_key, aggregate_id| streams_with_events.any? { |stream, _| aggregate_id == stream.aggregate_id } end @unique_keys = keys.merge( *streams_with_events.map do |stream, _| stream.unique_keys.to_h { |scope, key| [[scope, key], stream.aggregate_id] } end, ) do |_key, id_1, id_2| if id_1 != id_2 stream, = streams_with_events.find { |s| s[0].aggregate_id == id_2 } fail Sequent::Core::AggregateKeyNotUniqueError, "duplicate unique key value for aggregate #{stream.aggregate_type} #{stream.aggregate_id}" end end streams_with_events.each do |event_stream, events| serialized = serialize_events(events) @event_streams[event_stream.aggregate_id] = event_stream @all_events[event_stream.aggregate_id] ||= [] @all_events[event_stream.aggregate_id] += serialized @stored_events += serialized end publish_events(streams_with_events.flat_map { |_, events| events }) end |
#event_streams_enumerator(aggregate_type: nil, group_size: 100) ⇒ Object
129 130 131 132 133 134 135 136 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 129 def event_streams_enumerator(aggregate_type: nil, group_size: 100) @event_streams .values .select { |es| aggregate_type.nil? || es.aggregate_type == aggregate_type } .sort_by { |es| [es.events_partition_key, es.aggregate_id] } .map(&:aggregate_id) .each_slice(group_size) end |
#events_exists?(aggregate_id) ⇒ Boolean
117 118 119 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 117 def events_exists?(aggregate_id) @event_streams[aggregate_id].present? end |
#find_event_stream(aggregate_id) ⇒ Object
79 80 81 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 79 def find_event_stream(aggregate_id) @event_streams[aggregate_id] end |
#load_events(aggregate_id) ⇒ Object
65 66 67 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 65 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 69 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? aggregate_ids.map do |aggregate_id| @event_streams[aggregate_id] end.compact.map do |event_stream| [event_stream, deserialize_events(@all_events[event_stream.aggregate_id])] end end |
#load_events_since_marked_position(mark) ⇒ Object
125 126 127 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 125 def load_events_since_marked_position(mark) [deserialize_events(@stored_events[mark..]), position_mark] end |
#position_mark ⇒ Object
121 122 123 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 121 def position_mark @stored_events.length end |
#publish_events(events) ⇒ Object
109 110 111 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 109 def publish_events(events) Sequent.configuration.event_publisher.publish_events(events) end |
#stream_exists?(aggregate_id) ⇒ Boolean
113 114 115 |
# File 'lib/sequent/test/command_handler_helpers.rb', line 113 def stream_exists?(aggregate_id) @event_streams.key?(aggregate_id) end |