Class: EventStore::EventAppender
- Inherits:
-
Object
- Object
- EventStore::EventAppender
- Defined in:
- lib/event_store/event_appender.rb
Instance Method Summary collapse
- #append(raw_events) ⇒ Object
-
#initialize(aggregate) ⇒ EventAppender
constructor
A new instance of EventAppender.
- #store_snapshot(prepared_events) ⇒ Object
Constructor Details
#initialize(aggregate) ⇒ EventAppender
Returns a new instance of EventAppender.
4 5 6 |
# File 'lib/event_store/event_appender.rb', line 4 def initialize aggregate @aggregate = aggregate end |
Instance Method Details
#append(raw_events) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/event_store/event_appender.rb', line 8 def append raw_events EventStore.db.transaction do set_current_version prepared_events = raw_events.map do |raw_event| event = prepare_event(raw_event) validate! event raise concurrency_error(event) if has_concurrency_issue?(event) event end # All concurrency issues need to be checked before persisting any of the events # Otherwise, the newly appended events may raise erroneous concurrency errors result = @aggregate.events.multi_insert(prepared_events) store_snapshot(prepared_events) unless result.nil? result end end |
#store_snapshot(prepared_events) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/event_store/event_appender.rb', line 26 def store_snapshot(prepared_events) r = EventStore.redis current_version_numbers = r.hgetall(@aggregate.snapshot_version_table) current_version_numbers.default = -1 valid_snapshot_events = [] valid_snapshot_versions = [] prepared_events.each do |event| if event[:version].to_i > current_version_numbers[event[:fully_qualified_name]].to_i valid_snapshot_events << event[:fully_qualified_name] valid_snapshot_events << (event[:version].to_s + EventStore::SNAPSHOT_DELIMITER + event[:serialized_event] + EventStore::SNAPSHOT_DELIMITER + event[:occurred_at].to_s) valid_snapshot_versions << event[:fully_qualified_name] valid_snapshot_versions << event[:version] end end unless valid_snapshot_versions.empty? last_version = valid_snapshot_versions.last valid_snapshot_versions << :current_version valid_snapshot_versions << last_version.to_i r.multi do r.hmset(@aggregate.snapshot_version_table, valid_snapshot_versions) r.hmset(@aggregate.snapshot_table, valid_snapshot_events) end end end |