Class: Sourced::Backends::TestBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/sourced/backends/test_backend.rb

Defined Under Namespace

Classes: Group, State

Constant Summary collapse

Stats =
Data.define(:stream_count, :max_global_seq, :groups)

Instance Method Summary collapse

Constructor Details

#initializeTestBackend

Returns a new instance of TestBackend.



87
88
89
90
91
92
# File 'lib/sourced/backends/test_backend.rb', line 87

def initialize
  clear!
  @mutex = Mutex.new
  @in_tx = false
  @tx_id = nil
end

Instance Method Details

#ack_on(group_id, event_id) ⇒ Object



182
183
184
185
186
187
# File 'lib/sourced/backends/test_backend.rb', line 182

def ack_on(group_id, event_id, &)
  transaction do
    group = @state.groups[group_id]
    group.ack_on(event_id, &)
  end
end

#append_to_stream(stream_id, events) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/sourced/backends/test_backend.rb', line 229

def append_to_stream(stream_id, events)
  transaction do
    check_unique_seq!(events)

    events.each do |event|
      @state.events_by_correlation_id[event.correlation_id] << event
      @state.events_by_stream_id[stream_id] << event
      @state.events << event
      @state.stream_id_seq_index[seq_key(stream_id, event)] = true
    end
  end
  @state.groups.each_value(&:reindex)
  true
end

#clear!Object



167
168
169
# File 'lib/sourced/backends/test_backend.rb', line 167

def clear!
  @state = State.new
end

#eventsObject



94
# File 'lib/sourced/backends/test_backend.rb', line 94

def events = @state.events

#inspectObject



96
97
98
# File 'lib/sourced/backends/test_backend.rb', line 96

def inspect
  %(<#{self.class} events:#{events.size} streams:#{@state.events_by_stream_id.size}>)
end

#installed?Boolean

Returns:

  • (Boolean)


171
# File 'lib/sourced/backends/test_backend.rb', line 171

def installed? = true

#next_commandObject



195
196
197
198
199
# File 'lib/sourced/backends/test_backend.rb', line 195

def next_command(&)
  transaction do
    @state.next_command(&)
  end
end

#read_correlation_batch(event_id) ⇒ Object



244
245
246
247
248
# File 'lib/sourced/backends/test_backend.rb', line 244

def read_correlation_batch(event_id)
  event = @state.events.find { |e| e.id == event_id }
  return [] unless event
  @state.events_by_correlation_id[event.correlation_id]
end

#read_event_stream(stream_id, after: nil, upto: nil) ⇒ Object



250
251
252
253
254
255
# File 'lib/sourced/backends/test_backend.rb', line 250

def read_event_stream(stream_id, after: nil, upto: nil)
  events = @state.events_by_stream_id[stream_id]
  events = events.select { |e| e.seq > after } if after
  events = events.select { |e| e.seq <= upto } if upto
  events
end

#reserve_next_for_reactor(reactor) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/sourced/backends/test_backend.rb', line 173

def reserve_next_for_reactor(reactor, &)
  group_id = reactor.consumer_info.group_id
  start_from = reactor.consumer_info.start_from.call
  transaction do
    group = @state.groups[group_id]
    group.reserve_next(reactor.handled_events, start_from, &)
  end
end

#schedule_commands(commands) ⇒ Object



189
190
191
192
193
# File 'lib/sourced/backends/test_backend.rb', line 189

def schedule_commands(commands)
  transaction do
    @state.schedule_commands(commands)
  end
end

#statsObject



203
204
205
206
207
208
# File 'lib/sourced/backends/test_backend.rb', line 203

def stats
  stream_count = @state.events_by_stream_id.size
  max_global_seq = events.size
  groups = @state.groups.values.map(&:to_h).filter { |g| g[:stream_count] > 0 }
  Stats.new(stream_count, max_global_seq, groups)
end

#transactionObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/sourced/backends/test_backend.rb', line 210

def transaction(&)
  if @in_tx
    yield
  else
    @mutex.synchronize do
      @in_tx = true
      @state_snapshot = @state.copy
      result = yield
      @in_tx = false
      @state_snapshot = nil
      result
    end
  end
rescue StandardError => e
  @in_tx = false
  @state = @state_snapshot if @state_snapshot
  raise
end