Class: Sourced::Backends::TestBackend
- Inherits:
-
Object
- Object
- Sourced::Backends::TestBackend
- Defined in:
- lib/sourced/backends/test_backend.rb
Defined Under Namespace
Constant Summary collapse
- Stats =
Data.define(:stream_count, :max_global_seq, :groups)
Instance Method Summary collapse
- #ack_on(group_id, event_id) ⇒ Object
- #append_to_stream(stream_id, events) ⇒ Object
- #clear! ⇒ Object
- #events ⇒ Object
-
#initialize ⇒ TestBackend
constructor
A new instance of TestBackend.
- #inspect ⇒ Object
- #installed? ⇒ Boolean
- #next_command ⇒ Object
- #read_correlation_batch(event_id) ⇒ Object
- #read_event_stream(stream_id, after: nil, upto: nil) ⇒ Object
- #reserve_next_for_reactor(reactor) ⇒ Object
- #schedule_commands(commands) ⇒ Object
- #stats ⇒ Object
- #transaction ⇒ Object
Constructor Details
#initialize ⇒ TestBackend
Returns a new instance of TestBackend.
87 88 89 90 91 92 |
# File 'lib/sourced/backends/test_backend.rb', line 87 def initialize clear! @mutex = Mutex.new @in_tx = false @tx_id = nil end |
Instance Method Details
#ack_on(group_id, event_id) ⇒ Object
182 183 184 185 186 187 |
# File 'lib/sourced/backends/test_backend.rb', line 182 def ack_on(group_id, event_id, &) transaction do group = @state.groups[group_id] group.ack_on(event_id, &) end end |
#append_to_stream(stream_id, events) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/sourced/backends/test_backend.rb', line 229 def append_to_stream(stream_id, events) transaction do check_unique_seq!(events) events.each do |event| @state.events_by_correlation_id[event.correlation_id] << event @state.events_by_stream_id[stream_id] << event @state.events << event @state.stream_id_seq_index[seq_key(stream_id, event)] = true end end @state.groups.each_value(&:reindex) true end |
#clear! ⇒ Object
167 168 169 |
# File 'lib/sourced/backends/test_backend.rb', line 167 def clear! @state = State.new end |
#events ⇒ Object
94 |
# File 'lib/sourced/backends/test_backend.rb', line 94 def events = @state.events |
#inspect ⇒ Object
96 97 98 |
# File 'lib/sourced/backends/test_backend.rb', line 96 def inspect %(<#{self.class} events:#{events.size} streams:#{@state.events_by_stream_id.size}>) end |
#installed? ⇒ Boolean
171 |
# File 'lib/sourced/backends/test_backend.rb', line 171 def installed? = true |
#next_command ⇒ Object
195 196 197 198 199 |
# File 'lib/sourced/backends/test_backend.rb', line 195 def next_command(&) transaction do @state.next_command(&) end end |
#read_correlation_batch(event_id) ⇒ Object
244 245 246 247 248 |
# File 'lib/sourced/backends/test_backend.rb', line 244 def read_correlation_batch(event_id) event = @state.events.find { |e| e.id == event_id } return [] unless event @state.events_by_correlation_id[event.correlation_id] end |
#read_event_stream(stream_id, after: nil, upto: nil) ⇒ Object
250 251 252 253 254 255 |
# File 'lib/sourced/backends/test_backend.rb', line 250 def read_event_stream(stream_id, after: nil, upto: nil) events = @state.events_by_stream_id[stream_id] events = events.select { |e| e.seq > after } if after events = events.select { |e| e.seq <= upto } if upto events end |
#reserve_next_for_reactor(reactor) ⇒ Object
173 174 175 176 177 178 179 180 |
# File 'lib/sourced/backends/test_backend.rb', line 173 def reserve_next_for_reactor(reactor, &) group_id = reactor.consumer_info.group_id start_from = reactor.consumer_info.start_from.call transaction do group = @state.groups[group_id] group.reserve_next(reactor.handled_events, start_from, &) end end |
#schedule_commands(commands) ⇒ Object
189 190 191 192 193 |
# File 'lib/sourced/backends/test_backend.rb', line 189 def schedule_commands(commands) transaction do @state.schedule_commands(commands) end end |
#stats ⇒ Object
203 204 205 206 207 208 |
# File 'lib/sourced/backends/test_backend.rb', line 203 def stats stream_count = @state.events_by_stream_id.size max_global_seq = events.size groups = @state.groups.values.map(&:to_h).filter { |g| g[:stream_count] > 0 } Stats.new(stream_count, max_global_seq, groups) end |
#transaction ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/sourced/backends/test_backend.rb', line 210 def transaction(&) if @in_tx yield else @mutex.synchronize do @in_tx = true @state_snapshot = @state.copy result = yield @in_tx = false @state_snapshot = nil result end end rescue StandardError => e @in_tx = false @state = @state_snapshot if @state_snapshot raise end |