Class: EventSourcery::Postgres::EventStore
- Inherits:
-
Object
- Object
- EventSourcery::Postgres::EventStore
- Includes:
- EventStore::EachByRange
- Defined in:
- lib/event_sourcery/postgres/event_store.rb
Instance Method Summary collapse
-
#get_events_for_aggregate_id(aggregate_id) ⇒ Array
Get the events for a given aggregate id.
-
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array
Get the next set of events from the given event id.
-
#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore
constructor
A new instance of EventStore.
-
#latest_event_id(event_types: nil) ⇒ Object
Get last event id for a given event types.
-
#sink(event_or_events, expected_version: nil) ⇒ Object
Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system.
-
#subscribe(from_id:, subscription_master:, event_types: nil, after_listen: nil, &block) ⇒ Object
Subscribe to events.
Constructor Details
#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore
Returns a new instance of EventStore.
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 8 def initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) @db_connection = db_connection @events_table_name = events_table_name @write_events_function_name = write_events_function_name @lock_table = lock_table @event_builder = event_builder @on_events_recorded = on_events_recorded end |
Instance Method Details
#get_events_for_aggregate_id(aggregate_id) ⇒ Array
Get the events for a given aggregate id.
91 92 93 94 95 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 91 def get_events_for_aggregate_id(aggregate_id) events_table.where(aggregate_id: aggregate_id.to_str).order(:version).map do |event_hash| build_event(event_hash) end end |
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array
Get the next set of events from the given event id. You can specify event types and a limit. Default limit is 1000 and the default event types will be all.
60 61 62 63 64 65 66 67 68 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 60 def get_next_from(id, event_types: nil, limit: 1000) query = events_table .order(:id) .where(Sequel.lit('id >= ?', id)) .limit(limit) query = query.where(type: event_types) if event_types query.map { |event_row| build_event(event_row) } end |
#latest_event_id(event_types: nil) ⇒ Object
Get last event id for a given event types.
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 75 def latest_event_id(event_types: nil) latest_event = events_table latest_event = latest_event.where(type: event_types) if event_types latest_event = latest_event.order(:id).last if latest_event latest_event[:id] else 0 end end |
#sink(event_or_events, expected_version: nil) ⇒ Object
Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system. So to will the given events you put in this ‘sink’. Except the plumbing system is the data base events table. This can raise db connection errors.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 33 def sink(event_or_events, expected_version: nil) events = Array(event_or_events) aggregate_ids = events.map(&:aggregate_id).uniq raise AtomicWriteToMultipleAggregatesNotSupported unless aggregate_ids.count == 1 sql = write_events_sql(aggregate_ids.first, events, expected_version) @db_connection.run(sql) log_events_saved(events) on_events_recorded.call(events) true rescue Sequel::DatabaseError => e if e. =~ /Concurrency conflict/ raise ConcurrencyError, "expected version was not #{expected_version}. Error: #{e.}" end raise end |
#subscribe(from_id:, subscription_master:, event_types: nil, after_listen: nil, &block) ⇒ Object
Subscribe to events.
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 103 def subscribe(from_id:, subscription_master:, event_types: nil, after_listen: nil, &block) poll_waiter = OptimisedEventPollWaiter.new(db_connection: @db_connection, after_listen: after_listen) args = { poll_waiter: poll_waiter, event_store: self, from_event_id: from_id, event_types: event_types, events_table_name: @events_table_name, subscription_master: subscription_master, on_new_events: block } EventSourcery::EventStore::Subscription.new(**args).tap(&:start) end |