Class: RubyEventStore::InMemoryRepository
- Inherits:
-
Object
- Object
- RubyEventStore::InMemoryRepository
- Defined in:
- lib/ruby_event_store/in_memory_repository.rb
Instance Method Summary collapse
- #append_to_stream(records, stream, expected_version) ⇒ Object
- #count(spec) ⇒ Object
- #delete_stream(stream) ⇒ Object
- #has_event?(event_id) ⇒ Boolean
-
#initialize(serializer: NULL) ⇒ InMemoryRepository
constructor
A new instance of InMemoryRepository.
- #last_stream_event(stream) ⇒ Object
- #link_to_stream(event_ids, stream, expected_version) ⇒ Object
- #read(spec) ⇒ Object
- #streams_of(event_id) ⇒ Object
- #update_messages(records) ⇒ Object
Constructor Details
#initialize(serializer: NULL) ⇒ InMemoryRepository
Returns a new instance of InMemoryRepository.
7 8 9 10 11 12 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 7 def initialize(serializer: NULL) @serializer = serializer @streams = Hash.new { |h, k| h[k] = Array.new } @mutex = Mutex.new @storage = Hash.new end |
Instance Method Details
#append_to_stream(records, stream, expected_version) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 14 def append_to_stream(records, stream, expected_version) serialized_records = records.map { |record| record.serialize(serializer) } with_synchronize(expected_version, stream) do |resolved_version| raise WrongExpectedEventVersion unless last_stream_version(stream).equal?(resolved_version) serialized_records.each do |serialized_record| raise EventDuplicatedInStream if has_event?(serialized_record.event_id) storage[serialized_record.event_id] = serialized_record streams[stream.name] << serialized_record.event_id end end self end |
#count(spec) ⇒ Object
79 80 81 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 79 def count(spec) read_scope(spec).count end |
#delete_stream(stream) ⇒ Object
43 44 45 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 43 def delete_stream(stream) streams.delete(stream.name) end |
#has_event?(event_id) ⇒ Boolean
47 48 49 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 47 def has_event?(event_id) storage.has_key?(event_id) end |
#last_stream_event(stream) ⇒ Object
51 52 53 54 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 51 def last_stream_event(stream) last_id = event_ids_of_stream(stream).last storage.fetch(last_id).deserialize(serializer) if last_id end |
#link_to_stream(event_ids, stream, expected_version) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 29 def link_to_stream(event_ids, stream, expected_version) serialized_records = event_ids.map { |id| read_event(id) } with_synchronize(expected_version, stream) do |resolved_version| raise WrongExpectedEventVersion unless last_stream_version(stream).equal?(resolved_version) serialized_records.each do |serialized_record| raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name) streams[stream.name] << serialized_record.event_id end end self end |
#read(spec) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 56 def read(spec) serialized_records = read_scope(spec) if spec.batched? batch_reader = ->(offset, limit) do serialized_records .drop(offset) .take(limit) .map { |serialized_record| serialized_record.deserialize(serializer) } end BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each elsif spec.first? serialized_records.first&.deserialize(serializer) elsif spec.last? serialized_records.last&.deserialize(serializer) else Enumerator.new do |y| serialized_records.each do |serialized_record| y << serialized_record.deserialize(serializer) end end end end |
#streams_of(event_id) ⇒ Object
99 100 101 102 103 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 99 def streams_of(event_id) streams .select { |name,| has_event_in_stream?(event_id, name) } .map { |name,| Stream.new(name) } end |
#update_messages(records) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/ruby_event_store/in_memory_repository.rb', line 83 def (records) records.each do |record| read_event(record.event_id) serialized_record = Record.new( event_id: record.event_id, event_type: record.event_type, data: record.data, metadata: record., timestamp: Time.iso8601(storage.fetch(record.event_id).), valid_at: record.valid_at, ).serialize(serializer) storage[record.event_id] = serialized_record end end |