Class: EventSourcery::Postgres::EventStore

Inherits:
Object
  • Object
show all
Includes:
EventStore::EachByRange
Defined in:
lib/event_sourcery/postgres/event_store.rb

Instance Method Summary collapse

Constructor Details

#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder) ⇒ EventStore

Returns a new instance of EventStore.



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/event_sourcery/postgres/event_store.rb', line 6

def initialize(db_connection,
               events_table_name: EventSourcery::Postgres.config.events_table_name,
               lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth,
               write_events_function_name: EventSourcery::Postgres.config.write_events_function_name,
               event_builder: EventSourcery.config.event_builder)
  @db_connection = db_connection
  @events_table_name = events_table_name
  @write_events_function_name = write_events_function_name
  @lock_table = lock_table
  @event_builder = event_builder
end

Instance Method Details

#get_events_for_aggregate_id(aggregate_id) ⇒ Array

Get the events for a given aggregate id.

Parameters:

  • aggregate_id

    the aggregate id to filter for

Returns:

  • (Array)

    of found events



84
85
86
87
88
# File 'lib/event_sourcery/postgres/event_store.rb', line 84

def get_events_for_aggregate_id(aggregate_id)
  events_table.where(aggregate_id: aggregate_id.to_str).order(:version).map do |event_hash|
    build_event(event_hash)
  end
end

#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array

Get the next set of events from the given event id. You can specify event types and a limit. Default limit is 1000 and the default event types will be all.

Parameters:

  • id

    the event id to get next events from

  • event_types (defaults to: nil)

    the event types to filter, default nil = all

  • limit (defaults to: 1000)

    the limit to the results, default 1000

Returns:

  • (Array)

    array of found events



54
55
56
57
58
59
60
61
# File 'lib/event_sourcery/postgres/event_store.rb', line 54

def get_next_from(id, event_types: nil, limit: 1000)
  query = events_table.
    order(:id).
    where(Sequel.lit('id >= ?', id)).
    limit(limit)
  query = query.where(type: event_types) if event_types
  query.map { |event_row| build_event(event_row) }
end

#latest_event_id(event_types: nil) ⇒ Object

Get last event id for a given event types.

Parameters:

  • event_types (defaults to: nil)

    the type of event(s) to filter

Returns:

  • the latest event id



68
69
70
71
72
73
74
75
76
77
# File 'lib/event_sourcery/postgres/event_store.rb', line 68

def latest_event_id(event_types: nil)
  latest_event = events_table
  latest_event = latest_event.where(type: event_types) if event_types
  latest_event = latest_event.order(:id).last
  if latest_event
    latest_event[:id]
  else
    0
  end
end

#sink(event_or_events, expected_version: nil) ⇒ Object

Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system. So to will the given events you put in this ‘sink’. Except the plumbing system is the data base events table. This can raise db connection errors.

Parameters:

  • event_or_events

    the event or events to save

  • expected_version (defaults to: nil)

    the version to save with the event, default nil

Raises:

  • (DatabaseError)

    if something goes wrong with the database

  • (ConcurrencyError)

    if there was a concurrency conflict



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/event_sourcery/postgres/event_store.rb', line 29

def sink(event_or_events, expected_version: nil)
  events = Array(event_or_events)
  aggregate_ids = events.map(&:aggregate_id).uniq
  raise AtomicWriteToMultipleAggregatesNotSupported unless aggregate_ids.count == 1
  sql = write_events_sql(aggregate_ids.first, events, expected_version)
  @db_connection.run(sql)
  log_events_saved(events)
  true
rescue Sequel::DatabaseError => e
  if e.message =~ /Concurrency conflict/
    raise ConcurrencyError, "expected version was not #{expected_version}. Error: #{e.message}"
  else
    raise
  end
end

#subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block) ⇒ Object

Subscribe to events.

Parameters:

  • from_id

    subscribe from a starting event id. default will be from the start.

  • event_types (defaults to: nil)

    the event_types to subscribe to, default all.

  • after_listen (defaults to: nil)

    the after listen call back block. default nil.

  • subscription_master

    the subscription master block



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/event_sourcery/postgres/event_store.rb', line 96

def subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block)
  poll_waiter = OptimisedEventPollWaiter.new(db_connection: @db_connection, after_listen: after_listen)
  args = {
    poll_waiter: poll_waiter,
    event_store: self,
    from_event_id: from_id,
    event_types: event_types,
    events_table_name: @events_table_name,
    subscription_master: subscription_master,
    on_new_events: block
  }
  EventSourcery::EventStore::Subscription.new(args).tap(&:start)
end