Class: EventStore::EventAppender

Inherits:
Object
  • Object
show all
Defined in:
lib/event_store/event_appender.rb

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ EventAppender

Returns a new instance of EventAppender.



4
5
6
# File 'lib/event_store/event_appender.rb', line 4

def initialize aggregate
  @aggregate = aggregate
end

Instance Method Details

#append(raw_events) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/event_store/event_appender.rb', line 8

def append raw_events
  EventStore.db.transaction do
    set_current_version

    prepared_events = raw_events.map do |raw_event|
      event = prepare_event(raw_event)
      validate! event
      raise concurrency_error(event) if has_concurrency_issue?(event)
      event
    end
    # All concurrency issues need to be checked before persisting any of the events
    # Otherwise, the newly appended events may raise erroneous concurrency errors
    result = @aggregate.events.multi_insert(prepared_events)
    store_snapshot(prepared_events) unless result.nil?
    result
  end
end

#store_snapshot(prepared_events) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/event_store/event_appender.rb', line 26

def store_snapshot(prepared_events)
  r = EventStore.redis
  current_version_numbers = r.hgetall(@aggregate.snapshot_version_table)
  current_version_numbers.default = -1
  valid_snapshot_events = []
  valid_snapshot_versions = []
  prepared_events.each do |event|
    if event[:version].to_i > current_version_numbers[event[:fully_qualified_name]].to_i
      valid_snapshot_events   << event[:fully_qualified_name]
      valid_snapshot_events   << (event[:version].to_s + EventStore::SNAPSHOT_DELIMITER + event[:serialized_event] + EventStore::SNAPSHOT_DELIMITER + event[:occurred_at].to_s)
      valid_snapshot_versions << event[:fully_qualified_name]
      valid_snapshot_versions << event[:version]
    end
  end
  unless valid_snapshot_versions.empty?
    last_version            = valid_snapshot_versions.last
    valid_snapshot_versions << :current_version
    valid_snapshot_versions << last_version.to_i
    r.multi do
      r.hmset(@aggregate.snapshot_version_table, valid_snapshot_versions)
      r.hmset(@aggregate.snapshot_table, valid_snapshot_events)
    end
  end
end