Class: EventSourcery::Postgres::EventStore

Inherits:
Object
  • Object
show all
Includes:
EventStore::EachByRange
Defined in:
lib/event_sourcery/postgres/event_store.rb

Overview

PostgreSQL implementation of an event store for persisting and retrieving domain events.

Instance Method Summary collapse

Constructor Details

#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore

Returns a new instance of EventStore.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/event_sourcery/postgres/event_store.rb', line 9

def initialize(db_connection,
               events_table_name: EventSourcery::Postgres.config.events_table_name,
               lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth,
               write_events_function_name: EventSourcery::Postgres.config.write_events_function_name,
               event_builder: EventSourcery.config.event_builder,
               on_events_recorded: EventSourcery::Postgres.config.on_events_recorded)
  @db_connection = db_connection
  @events_table_name = events_table_name
  @write_events_function_name = write_events_function_name
  @lock_table = lock_table
  @event_builder = event_builder
  @on_events_recorded = on_events_recorded
end

Instance Method Details

#get_events_for_aggregate_id(aggregate_id) ⇒ Array

Get the events for a given aggregate id.

Parameters:

  • the aggregate id to filter for

Returns:

  • of found events



92
93
94
95
96
# File 'lib/event_sourcery/postgres/event_store.rb', line 92

def get_events_for_aggregate_id(aggregate_id)
  events_table.where(aggregate_id: aggregate_id.to_str).order(:version).map do |event_hash|
    build_event(event_hash)
  end
end

#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array

Get the next set of events from the given event id. You can specify event types and a limit. Default limit is 1000 and the default event types will be all.

Parameters:

  • the event id to get next events from

  • (defaults to: nil)

    the event types to filter, default nil = all

  • (defaults to: 1000)

    the limit to the results, default 1000

Returns:

  • array of found events



61
62
63
64
65
66
67
68
69
# File 'lib/event_sourcery/postgres/event_store.rb', line 61

def get_next_from(id, event_types: nil, limit: 1000)
  query =
    events_table
    .order(:id)
    .where(Sequel.lit('id >= ?', id))
    .limit(limit)
  query = query.where(type: event_types) if event_types
  query.map { |event_row| build_event(event_row) }
end

#latest_event_id(event_types: nil) ⇒ Object

Get last event id for a given event types.

Parameters:

  • (defaults to: nil)

    the type of event(s) to filter

Returns:

  • the latest event id



76
77
78
79
80
81
82
83
84
85
# File 'lib/event_sourcery/postgres/event_store.rb', line 76

def latest_event_id(event_types: nil)
  latest_event = events_table
  latest_event = latest_event.where(type: event_types) if event_types
  latest_event = latest_event.order(:id).last
  if latest_event
    latest_event[:id]
  else
    0
  end
end

#sink(event_or_events, expected_version: nil) ⇒ Object

Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system. So to will the given events you put in this ‘sink’. Except the plumbing system is the data base events table. This can raise db connection errors.

Parameters:

  • the event or events to save

  • (defaults to: nil)

    the version to save with the event, default nil

Raises:

  • if something goes wrong with the database

  • if there was a concurrency conflict



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/event_sourcery/postgres/event_store.rb', line 34

def sink(event_or_events, expected_version: nil)
  events = Array(event_or_events)
  aggregate_ids = events.map(&:aggregate_id).uniq
  raise AtomicWriteToMultipleAggregatesNotSupported unless aggregate_ids.count == 1

  sql = write_events_sql(aggregate_ids.first, events, expected_version)
  @db_connection.run(sql)
  log_events_saved(events)
  on_events_recorded.call(events)
  true
rescue Sequel::DatabaseError => e
  if e.message =~ /Concurrency conflict/
    raise ConcurrencyError, "expected version was not #{expected_version}. Error: #{e.message}"
  end

  raise
end

#subscribe(from_id:, subscription_master:, event_types: nil, after_listen: nil, &block) ⇒ Object

Subscribe to events.

Parameters:

  • subscribe from a starting event id. default will be from the start.

  • (defaults to: nil)

    the event_types to subscribe to, default all.

  • (defaults to: nil)

    the after listen call back block. default nil.

  • the subscription master block



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/event_sourcery/postgres/event_store.rb', line 104

def subscribe(from_id:, subscription_master:, event_types: nil, after_listen: nil, &block)
  poll_waiter = OptimisedEventPollWaiter.new(db_connection: @db_connection, after_listen: after_listen)
  args = {
    poll_waiter: poll_waiter,
    event_store: self,
    from_event_id: from_id,
    event_types: event_types,
    events_table_name: @events_table_name,
    subscription_master: subscription_master,
    on_new_events: block
  }
  EventSourcery::EventStore::Subscription.new(**args).tap(&:start)
end