Class: Messaging::Adapters::Postgres

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/postgres.rb,
lib/messaging/adapters/postgres/store.rb,
lib/messaging/adapters/postgres/stream.rb,
lib/messaging/adapters/postgres/streams.rb,
lib/messaging/adapters/postgres/category.rb,
lib/messaging/adapters/postgres/consumer.rb,
lib/messaging/adapters/postgres/categories.rb,
lib/messaging/adapters/postgres/create_lock.rb,
lib/messaging/adapters/postgres/release_lock.rb,
lib/messaging/adapters/postgres/categories/row.rb,
lib/messaging/adapters/postgres/serialized_message.rb,
lib/messaging/adapters/postgres/category_with_partitions.rb,
lib/messaging/adapters/postgres/advisory_transaction_lock.rb

Overview

Adapter for using Postgres and Active Record as a message store. capabilities provided by this adapter.

Defined Under Namespace

Classes: AdvisoryTransactionLock, Categories, Category, CategoryWithPartitions, Consumer, CreateLock, ReleaseLock, SerializedMessage, Store, Stream, Streams

Instance Method Summary collapse

Instance Method Details

#create_consumer(name, **options) ⇒ Object



28
29
30
31
# File 'lib/messaging/adapters/postgres.rb', line 28

def create_consumer(name, **options)
  Consumer.where(app: Messaging.config.app_name, name: name.to_s).first ||
    Consumer.create(app: Messaging.config.app_name, name: name.to_s)
end

#create_messages_tableObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/messaging/adapters/postgres.rb', line 33

def create_messages_table
  sql = "    CREATE SCHEMA IF NOT EXISTS messaging;\n    CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq;\n\n    CREATE TABLE messaging.messages (\n      id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL,\n      transaction_id xid8 DEFAULT pg_current_xact_id() NOT NULL,\n      uuid uuid NOT NULL,\n      stream character varying NOT NULL,\n      stream_position bigint NOT NULL,\n      message_type character varying NOT NULL,\n      data jsonb,\n      created_at timestamp without time zone NOT NULL,\n      updated_at timestamp without time zone NOT NULL,\n      stream_category character varying,\n      stream_id character varying\n    )\n    PARTITION BY LIST (stream_category);\n\n    CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id);\n    CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id);\n    CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position);\n  SQL\n  connection.execute sql\nend\n"