Class: Messaging::Adapters::Postgres

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/postgres.rb,
lib/messaging/adapters/postgres/store.rb,
lib/messaging/adapters/postgres/stream.rb,
lib/messaging/adapters/postgres/streams.rb,
lib/messaging/adapters/postgres/category.rb,
lib/messaging/adapters/postgres/categories.rb,
lib/messaging/adapters/postgres/categories/row.rb,
lib/messaging/adapters/postgres/serialized_message.rb,
lib/messaging/adapters/postgres/category_with_partitions.rb,
lib/messaging/adapters/postgres/advisory_transaction_lock.rb

Overview

Adapter for using Postgres and Active Record as a message store. capabilities provided by this adapter.

Defined Under Namespace

Classes: AdvisoryTransactionLock, Categories, Category, CategoryWithPartitions, SerializedMessage, Store, Stream, Streams

Instance Method Summary collapse

Instance Method Details

#create_messages_tableObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/messaging/adapters/postgres.rb', line 22

def create_messages_table
  sql = <<~SQL
    CREATE SCHEMA IF NOT EXISTS messaging;
    CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq;

    CREATE TABLE messaging.messages (
      id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL,
      uuid uuid NOT NULL,
      stream character varying NOT NULL,
      stream_position bigint NOT NULL,
      message_type character varying NOT NULL,
      data jsonb,
      created_at timestamp without time zone NOT NULL,
      updated_at timestamp without time zone NOT NULL,
      stream_category character varying,
      stream_id character varying
    )
    PARTITION BY LIST (stream_category);

    CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id);
    CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id);
    CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position);
  SQL
  connection.execute sql
end