Class: EventSourcery::Memory::EventStore

Inherits:
Object
  • Object
show all
Includes:
EventStore::EachByRange
Defined in:
lib/event_sourcery/memory/event_store.rb

Overview

In-memory event store.

Note: This is not persisted and is generally used for testing.

Instance Method Summary collapse

Methods included from EventStore::EachByRange

#each_by_range

Constructor Details

#initialize(events = [], event_builder: EventSourcery.config.event_builder) ⇒ EventStore

Returns a new instance of EventStore.

Parameters:

  • events (Array) (defaults to: [])

    Optional. Collection of events

  • event_builder (defaults to: EventSourcery.config.event_builder)

    Optional. Event builder instance. Will default to Config#event_builder



12
13
14
15
16
# File 'lib/event_sourcery/memory/event_store.rb', line 12

def initialize(events = [], event_builder: EventSourcery.config.event_builder)
  @events = events
  @event_builder = event_builder
  @listeners = []
end

Instance Method Details

#add_listeners(listeners) ⇒ Object

Adds a listener or listeners to the memory store. the #process(event) method will execute whenever an event is emitted

Parameters:

  • listener

    A single listener or an array of listeners



120
121
122
# File 'lib/event_sourcery/memory/event_store.rb', line 120

def add_listeners(listeners)
  @listeners.concat(Array(listeners))
end

#ensure_one_aggregate(events) ⇒ Object

Ensure all events have the same aggregate

Parameters:

  • events (Array)

    Collection of events

Raises:

  • AtomicWriteToMultipleAggregatesNotSupported



110
111
112
113
114
# File 'lib/event_sourcery/memory/event_store.rb', line 110

def ensure_one_aggregate(events)
  unless events.map(&:aggregate_id).uniq.one?
    raise AtomicWriteToMultipleAggregatesNotSupported
  end
end

#get_events_for_aggregate_id(id) ⇒ Object

Get all events for the given aggregate

Parameters:

  • id (String)

    Aggregate ID (UUID as String)

Returns:

  • Array



85
86
87
88
# File 'lib/event_sourcery/memory/event_store.rb', line 85

def get_events_for_aggregate_id(id)
  stringified_id = id.to_str
  @events.select { |event| event.aggregate_id == stringified_id }
end

#get_next_from(id, event_types: nil, limit: 1000) ⇒ Object

Retrieve a subset of events

Parameters:

  • id

    Starting from event ID

  • event_types (Array) (defaults to: nil)

    Optional. If supplied, only retrieve events of given type(s).

  • limit (Integer) (defaults to: 1000)

    Optional. Number of events to retrieve (starting from the given event ID).

Returns:

  • Array



57
58
59
60
61
62
63
64
65
# File 'lib/event_sourcery/memory/event_store.rb', line 57

def get_next_from(id, event_types: nil, limit: 1000)
  events = if event_types.nil?
    @events
  else
    @events.select { |e| event_types.include?(e.type) }
  end

  events.select { |event| event.id >= id }.first(limit)
end

#latest_event_id(event_types: nil) ⇒ Object

Retrieve the latest event ID

Parameters:

  • event_types (Array) (defaults to: nil)

    Optional. If supplied, only retrieve events of given type(s).

Returns:

  • Integer



71
72
73
74
75
76
77
78
79
# File 'lib/event_sourcery/memory/event_store.rb', line 71

def latest_event_id(event_types: nil)
  events = if event_types.nil?
    @events
  else
    @events.select { |e| event_types.include?(e.type) }
  end

  events.empty? ? 0 : events.last.id
end

#next_version(aggregate_id) ⇒ Object

Next version for the aggregate

Parameters:

  • aggregate_id (String)

    Aggregate ID (UUID as String)

Returns:

  • Integer



94
95
96
# File 'lib/event_sourcery/memory/event_store.rb', line 94

def next_version(aggregate_id)
  version_for(aggregate_id) + 1
end

#sink(event_or_events, expected_version: nil) ⇒ Object

Store given events to the in-memory store

Parameters:

  • event_or_events

    Event(s) to be stored

  • expected_version (Optional) (defaults to: nil)

    Expected version for the aggregate. This is the version the caller of this method expect the aggregate to be in. If it’s different from the expected version a ConcurrencyError will be raised. Defaults to nil.

Returns:

  • Boolean

Raises:

  • EventSourcery::ConcurrencyError



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/event_sourcery/memory/event_store.rb', line 24

def sink(event_or_events, expected_version: nil)
  events = Array(event_or_events)
  ensure_one_aggregate(events)

  if expected_version && version_for(events.first.aggregate_id) != expected_version
    raise ConcurrencyError
  end

  events.each do |event|
    @events << @event_builder.build(
      id: @events.size + 1,
      aggregate_id: event.aggregate_id,
      type: event.type,
      version: next_version(event.aggregate_id),
      body: EventBodySerializer.serialize(event.body),
      created_at: event.created_at || Time.now.utc,
      uuid: event.uuid,
      correlation_id: event.correlation_id,
      causation_id: event.causation_id,
    )
  end

  project_events(events)

  true
end

#version_for(aggregate_id) ⇒ Object

Current version for the aggregate

Parameters:

  • aggregate_id (String)

    Aggregate ID (UUID as String)

Returns:

  • Integer



102
103
104
# File 'lib/event_sourcery/memory/event_store.rb', line 102

def version_for(aggregate_id)
  get_events_for_aggregate_id(aggregate_id).count
end