Class: AggregateRoot::SnapshotRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/aggregate_root/snapshot_repository.rb

Constant Summary collapse

DEFAULT_SNAPSHOT_INTERVAL =
100.freeze
SNAPSHOT_STREAM_PATTERN =
->(base_stream_name) { "#{base_stream_name}_snapshots" }
NotRestorableSnapshot =
Class.new(StandardError)
NotDumpableAggregateRoot =
Class.new(StandardError)
Snapshot =
Class.new(RubyEventStore::Event)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_store, interval = DEFAULT_SNAPSHOT_INTERVAL) ⇒ SnapshotRepository

Returns a new instance of SnapshotRepository.

Raises:

  • (ArgumentError)


13
14
15
16
17
18
19
# File 'lib/aggregate_root/snapshot_repository.rb', line 13

def initialize(event_store, interval = DEFAULT_SNAPSHOT_INTERVAL)
  raise ArgumentError, 'interval must be an Integer' unless interval.instance_of?(Integer)
  raise ArgumentError, 'interval must be greater than 0' unless interval > 0
  @event_store = event_store
  @interval = interval
  @error_handler = ->(_) { }
end

Instance Attribute Details

#error_handler=(value) ⇒ Object

Sets the attribute error_handler

Parameters:

  • value

    the value to set the attribute error_handler to.



21
22
23
# File 'lib/aggregate_root/snapshot_repository.rb', line 21

def error_handler=(value)
  @error_handler = value
end

Instance Method Details

#load(aggregate, stream_name) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/aggregate_root/snapshot_repository.rb', line 25

def load(aggregate, stream_name)
  last_snapshot = load_snapshot_event(stream_name)
  query = event_store.read.stream(stream_name)
  if last_snapshot
    begin
      aggregate = load_marshal(last_snapshot)
    rescue NotRestorableSnapshot => e
      error_handler.(e)
    else
      aggregate.version = last_snapshot.data.fetch(:version)
      query = query.from(last_snapshot.data.fetch(:last_event_id))
    end
  end
  query.reduce { |_, ev| aggregate.apply(ev) }
  aggregate.version = aggregate.version + aggregate.unpublished_events.count
  aggregate
end

#store(aggregate, stream_name) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/aggregate_root/snapshot_repository.rb', line 43

def store(aggregate, stream_name)
  events = aggregate.unpublished_events.to_a
  event_store.publish(events,
                      stream_name: stream_name,
                      expected_version: aggregate.version)

  aggregate.version = aggregate.version + events.count

  if time_for_snapshot?(aggregate.version, events.size)
    begin
      publish_snapshot_event(aggregate, stream_name, events.last.event_id)
    rescue NotDumpableAggregateRoot => e
      error_handler.(e)
    end
  end
end