Class: Messaging::Adapters::Postgres
- Inherits:
-
Object
- Object
- Messaging::Adapters::Postgres
- Defined in:
- lib/messaging/adapters/postgres.rb,
lib/messaging/adapters/postgres/store.rb,
lib/messaging/adapters/postgres/stream.rb,
lib/messaging/adapters/postgres/streams.rb,
lib/messaging/adapters/postgres/category.rb,
lib/messaging/adapters/postgres/consumer.rb,
lib/messaging/adapters/postgres/categories.rb,
lib/messaging/adapters/postgres/create_lock.rb,
lib/messaging/adapters/postgres/release_lock.rb,
lib/messaging/adapters/postgres/categories/row.rb,
lib/messaging/adapters/postgres/serialized_message.rb,
lib/messaging/adapters/postgres/category_with_partitions.rb,
lib/messaging/adapters/postgres/advisory_transaction_lock.rb
Overview
Adapter for using Postgres and Active Record as a message store. capabilities provided by this adapter.
Defined Under Namespace
Classes: AdvisoryTransactionLock, Categories, Category, CategoryWithPartitions, Consumer, CreateLock, ReleaseLock, SerializedMessage, Store, Stream, Streams
Instance Method Summary collapse
Instance Method Details
#create_consumer(name, **options) ⇒ Object
28 29 30 31 |
# File 'lib/messaging/adapters/postgres.rb', line 28 def create_consumer(name, **) Consumer.where(app: Messaging.config.app_name, name: name.to_s).first || Consumer.create(app: Messaging.config.app_name, name: name.to_s) end |
#create_messages_table ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/messaging/adapters/postgres.rb', line 33 def sql = " CREATE SCHEMA IF NOT EXISTS messaging;\n CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq;\n\n CREATE TABLE messaging.messages (\n id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL,\n transaction_id xid8 DEFAULT pg_current_xact_id() NOT NULL,\n uuid uuid NOT NULL,\n stream character varying NOT NULL,\n stream_position bigint NOT NULL,\n message_type character varying NOT NULL,\n data jsonb,\n created_at timestamp without time zone NOT NULL,\n updated_at timestamp without time zone NOT NULL,\n stream_category character varying,\n stream_id character varying\n )\n PARTITION BY LIST (stream_category);\n\n CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id);\n CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id);\n CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position);\n SQL\n connection.execute sql\nend\n" |