Class: Sequent::Core::EventStore

Inherits:
Object
  • Object
show all
Defined in:
lib/sequent/core/event_store.rb

Direct Known Subclasses

TenantEventStore

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = EventStoreConfiguration.new) ⇒ EventStore

Returns a new instance of EventStore.



32
33
34
35
# File 'lib/sequent/core/event_store.rb', line 32

def initialize(configuration = EventStoreConfiguration.new)
  @record_class = configuration.record_class
  @event_handlers = configuration.event_handlers
end

Class Attribute Details

.configurationObject

Returns the value of attribute configuration.



18
19
20
# File 'lib/sequent/core/event_store.rb', line 18

def configuration
  @configuration
end

.instanceObject

Returns the value of attribute instance.



18
19
20
# File 'lib/sequent/core/event_store.rb', line 18

def instance
  @instance
end

Class Method Details

.configure {|configuration| ... } ⇒ Object

Creates a new EventStore and overwrites all existing config. The new EventStore can be retrieved via the EventStore.instance method.

If you don’t want a singleton you can always instantiate it yourself using the EventStore.new.

Yields:



26
27
28
29
30
# File 'lib/sequent/core/event_store.rb', line 26

def self.configure
  self.configuration = EventStoreConfiguration.new
  yield(configuration) if block_given?
  EventStore.instance = EventStore.new(configuration)
end

Instance Method Details

#commit_events(command, events) ⇒ Object

Stores the events in the EventStore and publishes the events to the registered event_handlers.



41
42
43
44
# File 'lib/sequent/core/event_store.rb', line 41

def commit_events(command, events)
  store_events(command, events)
  publish_events(events, @event_handlers)
end

#load_events(aggregate_id) ⇒ Object

Returns all events for the aggregate ordered by sequence_number



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sequent/core/event_store.rb', line 49

def load_events(aggregate_id)
  event_types = {}
  @record_class.connection.select_all("select event_type, event_json from #{@record_class.table_name} where aggregate_id = '#{aggregate_id}' order by sequence_number asc").map! do |event_hash|
    event_type = event_hash["event_type"]
    event_json = Oj.strict_load(event_hash["event_json"])
    unless event_types.has_key?(event_type)
      event_types[event_type] = Class.const_get(event_type.to_sym)
    end
    event_types[event_type].deserialize_from_json(event_json)
  end
end

#replay_eventsObject

Replays all events in the event store to the registered event_handlers.

Parameters:

  • block

    that returns the event stream.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/sequent/core/event_store.rb', line 65

def replay_events
  event_stream = yield
  event_types = {}
  event_stream.each do |event_hash|
    event_type = event_hash["event_type"]
    payload = Oj.strict_load(event_hash["event_json"])
    unless event_types.has_key?(event_type)
      event_types[event_type] = Class.const_get(event_type.to_sym)
    end
    event = event_types[event_type].deserialize_from_json(payload)
    @event_handlers.each do |handler|
      handler.handle_message event
    end
  end
end