Class: RubyEventStore::InMemoryRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/in_memory_repository.rb

Defined Under Namespace

Classes: EventInStream, UnsupportedVersionAnyUsage

Instance Method Summary collapse

Constructor Details

#initialize(serializer: NULL, ensure_supported_any_usage: false) ⇒ InMemoryRepository

Returns a new instance of InMemoryRepository.


26
27
28
29
30
31
32
# File 'lib/ruby_event_store/in_memory_repository.rb', line 26

def initialize(serializer: NULL, ensure_supported_any_usage: false)
  @serializer = serializer
  @streams = Hash.new { |h, k| h[k] = Array.new }
  @mutex   = Mutex.new
  @storage = Hash.new
  @ensure_supported_any_usage = ensure_supported_any_usage
end

Instance Method Details

#append_to_stream(records, stream, expected_version) ⇒ Object


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/ruby_event_store/in_memory_repository.rb', line 34

def append_to_stream(records, stream, expected_version)
  serialized_records = records.map { |record| record.serialize(serializer) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    raise WrongExpectedEventVersion unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event?(serialized_record.event_id)
      storage[serialized_record.event_id] = serialized_record
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#count(spec) ⇒ Object


101
102
103
# File 'lib/ruby_event_store/in_memory_repository.rb', line 101

def count(spec)
  read_scope(spec).count
end

#delete_stream(stream) ⇒ Object


65
66
67
# File 'lib/ruby_event_store/in_memory_repository.rb', line 65

def delete_stream(stream)
  streams.delete(stream.name)
end

#global_position(event_id) ⇒ Object


133
134
135
# File 'lib/ruby_event_store/in_memory_repository.rb', line 133

def global_position(event_id)
  storage.keys.index(event_id) or raise EventNotFound.new(event_id)
end

#has_event?(event_id) ⇒ Boolean

Returns:

  • (Boolean)

69
70
71
# File 'lib/ruby_event_store/in_memory_repository.rb', line 69

def has_event?(event_id)
  storage.has_key?(event_id)
end

#last_stream_event(stream) ⇒ Object


73
74
75
76
# File 'lib/ruby_event_store/in_memory_repository.rb', line 73

def last_stream_event(stream)
  last_id = event_ids_of_stream(stream).last
  storage.fetch(last_id).deserialize(serializer) if last_id
end

50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/ruby_event_store/in_memory_repository.rb', line 50

def link_to_stream(event_ids, stream, expected_version)
  serialized_records = event_ids.map { |id| read_event(id) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    raise WrongExpectedEventVersion unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name)
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#position_in_stream(event_id, stream) ⇒ Object


127
128
129
130
131
# File 'lib/ruby_event_store/in_memory_repository.rb', line 127

def position_in_stream(event_id, stream)
  event_in_stream = streams[stream.name].find {|event_in_stream| event_in_stream.event_id.eql?(event_id) }
  raise EventNotFoundInStream if event_in_stream.nil?
  event_in_stream.position
end

#read(spec) ⇒ Object


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/ruby_event_store/in_memory_repository.rb', line 78

def read(spec)
  serialized_records = read_scope(spec)
  if spec.batched?
    batch_reader = ->(offset, limit) do
      serialized_records
        .drop(offset)
        .take(limit)
        .map { |serialized_record| serialized_record.deserialize(serializer) }
    end
    BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each
  elsif spec.first?
    serialized_records.first&.deserialize(serializer)
  elsif spec.last?
    serialized_records.last&.deserialize(serializer)
  else
    Enumerator.new do |y|
      serialized_records.each do |serialized_record|
        y << serialized_record.deserialize(serializer)
      end
    end
  end
end

#streams_of(event_id) ⇒ Object


121
122
123
124
125
# File 'lib/ruby_event_store/in_memory_repository.rb', line 121

def streams_of(event_id)
  streams
    .select { |name,| has_event_in_stream?(event_id, name) }
    .map    { |name,| Stream.new(name) }
end

#update_messages(records) ⇒ Object


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/ruby_event_store/in_memory_repository.rb', line 105

def update_messages(records)
  records.each do |record|
    read_event(record.event_id)
    serialized_record =
      Record.new(
        event_id:   record.event_id,
        event_type: record.event_type,
        data:       record.data,
        metadata:   record.,
        timestamp:  Time.iso8601(storage.fetch(record.event_id).timestamp),
        valid_at:   record.valid_at,
      ).serialize(serializer)
    storage[record.event_id] = serialized_record
  end
end