Class: EventStoreRuby::PostgresEventStore

Inherits:
Object
  • Object
show all
Defined in:
lib/eventstore_ruby/postgres_event_store.rb

Overview

Postgres-backed EventStore implementation.

Defined Under Namespace

Classes: Options

Constant Summary collapse

NON_EXISTENT_EVENT_TYPE =
"__NON_EXISTENT__#{rand(36**8).to_s(36)}".freeze

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ PostgresEventStore

Returns a new instance of PostgresEventStore.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 12

def initialize(options = {})
  opts = Options.new(options)
  conn_str = opts.connection_string || ENV['DATABASE_URL']
  raise 'eventstore-stores-postgres-err02: Connection string missing. DATABASE_URL environment variable not set.' unless conn_str

  @database_name = Postgres.get_database_name_from_connection_string(conn_str)
  raise "eventstore-stores-postgres-err03: Database name not found. Invalid connection string: #{conn_str}" unless @database_name

  @conn_str = conn_str
  @notifier = opts.notifier || MemoryEventStreamNotifier.new
  @conn = PG.connect(conn_str)
end

Instance Method Details

#append(events, filter = nil, expected_max_sequence_number: nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 39

def append(events, filter = nil, expected_max_sequence_number: nil)
  return if events.empty?

  # Normalize the +filter+ to always be an EventQuery so the SQL helpers
  # can work consistently. The TypeScript original expects an EventQuery
  # (logical OR combination of EventFilter objects).

  if filter.nil?
    # No filter supplied – treat this as an "append to empty stream" use
    # case.  Create a dummy filter that can never match so the CTE INSERT
    # only succeeds when the table is still empty (max_seq = 0).
    dummy_filter = EventStoreRuby.create_filter([NON_EXISTENT_EVENT_TYPE])
    filter = EventStoreRuby.create_query([dummy_filter])
    expected_max_sequence_number = 0
  elsif filter.is_a?(EventFilter)
    # Wrap single EventFilter in an EventQuery for uniform handling
    filter = EventStoreRuby.create_query([filter])
  end

  # At this point +filter+ is guaranteed to be an EventQuery with at least
  # one inner EventFilter, so no further guard needed.

  raise 'eventstore-stores-postgres-err04: Expected max sequence number is required when a filter is provided!' if expected_max_sequence_number.nil?

  cte_sql, context_params = Postgres::InsertBuilder.build_cte_insert_query(filter, expected_max_sequence_number)
  params = Postgres::Transform.prepare_insert_params(events, context_params)

  result = @conn.exec_params(cte_sql, params)
  if result.ntuples.zero?
    raise 'eventstore-stores-postgres-err05: Context changed: events were modified between query() and append()'
  end

  inserted_events = Postgres::Transform.map_records_to_events(result)
  @notifier.notify(inserted_events)
end

#closeObject



81
82
83
84
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 81

def close
  @notifier.close
  @conn.close
end

#initialize_databaseObject

Creates the database (if possible) and tables/indexes



76
77
78
79
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 76

def initialize_database
  create_database
  create_table_and_indexes
end

#query(filter) ⇒ Object

Returns QueryResult



26
27
28
29
30
31
32
33
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 26

def query(filter)
  sql, params = Postgres::QueryBuilder.build_context_query(filter)
  result = @conn.exec_params(sql, params)
  QueryResult.new(
    events: Postgres::Transform.map_records_to_events(result),
    max_sequence_number: Postgres::Transform.extract_max_sequence_number(result)
  )
end

#subscribe(&handle) ⇒ Object



35
36
37
# File 'lib/eventstore_ruby/postgres_event_store.rb', line 35

def subscribe(&handle)
  @notifier.subscribe(handle)
end