Class: DirtyPipeline::PG::Queue
- Inherits:
-
Object
- Object
- DirtyPipeline::PG::Queue
- Defined in:
- lib/dirty_pipeline/pg/queue.rb
Constant Summary collapse
- DELETE_ACTIVE =
<<~SQL DELETE FROM dp_active_events WHERE key = $1; SQL
- DELETE_EVENTS =
<<~SQL DELETE FROM dp_event_queues WHERE key = $1; SQL
- SELECT_ALL_EVENTS =
<<~SQL SELECT payload FROM dp_event_queues WHERE key = $1 ORDER BY id DESC; SQL
- PUSH_EVENT =
<<~SQL INSERT INTO dp_event_queues (id, key, payload) VALUES (-nextval('dp_event_queues_id_seq'), $1, $2); SQL
- UNSHIFT_EVENT =
<<~SQL INSERT INTO dp_event_queues (key, payload) VALUES ($1, $2); SQL
- SELECT_LAST_EVENT =
<<~SQL SELECT id, payload FROM dp_event_queues WHERE key = $1 ORDER BY id DESC LIMIT 1; SQL
- DELETE_EVENT =
<<~SQL DELETE FROM dp_event_queues WHERE key = $1 AND id = $2; SQL
- DELETE_ACTIVE_EVENT =
<<~SQL DELETE FROM dp_active_events WHERE key = $1; SQL
- SET_EVENT_ACTIVE =
<<~SQL INSERT INTO dp_active_events (key, payload) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET payload = EXCLUDED.payload; SQL
- SELECT_ACTIVE_EVENT =
<<~SQL SELECT payload FROM dp_active_events WHERE key = $1; SQL
Class Method Summary collapse
-
.create!(connection) ⇒ Object
decoder = PG::TextDecoder::Array.new see stackoverflow.com/questions/34886260/how-do-you-decode-a-json-field-using-the-pg-gem.
- .destroy!(connection) ⇒ Object
Instance Method Summary collapse
- #clear! ⇒ Object
-
#initialize(operation, subject_class, subject_id, transaction_id) ⇒ Queue
constructor
A new instance of Queue.
- #pop ⇒ Object
- #processing_event ⇒ Object
- #push(event) ⇒ Object (also: #<<)
- #to_a ⇒ Object
- #unshift(event) ⇒ Object
- #with_postgres(&block) ⇒ Object
Constructor Details
#initialize(operation, subject_class, subject_id, transaction_id) ⇒ Queue
Returns a new instance of Queue.
32 33 34 35 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 32 def initialize(operation, subject_class, subject_id, transaction_id) @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \ "op_#{operation}:txid_#{transaction_id}" end |
Class Method Details
.create!(connection) ⇒ Object
decoder = PG::TextDecoder::Array.new see stackoverflow.com/questions/34886260/how-do-you-decode-a-json-field-using-the-pg-gem
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 6 def self.create!(connection) connection.exec <<~SQL CREATE TABLE dp_active_events ( key TEXT CONSTRAINT primary_event_queues_key PRIMARY KEY, payload TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE SEQUENCE dp_event_queues_id_seq START 1; CREATE TABLE dp_event_queues ( id BIGINT PRIMARY KEY DEFAULT nextval('dp_event_queues_id_seq'), key TEXT NOT NULL, payload TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); SQL end |
.destroy!(connection) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 24 def self.destroy!(connection) connection.exec <<~SQL DROP TABLE IF EXISTS dp_active_events; DROP TABLE IF EXISTS dp_event_queues; DROP SEQUENCE IF EXISTS dp_event_queues_id_seq; SQL end |
Instance Method Details
#clear! ⇒ Object
48 49 50 51 52 53 54 55 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 48 def clear! with_postgres do |c| c.transaction do |tc| tc.exec(DELETE_ACTIVE, [active_event_key]) tc.exec(DELETE_EVENTS, [events_queue_key]) end end end |
#pop ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 106 def pop with_postgres do |c| c.transaction do |tc| event_id, raw_event = PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key])) if raw_event.nil? tc.exec(DELETE_ACTIVE_EVENT, [active_event_key]) else tc.exec(DELETE_EVENT, [events_queue_key, event_id]) tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event]) end unpack(raw_event) end end end |
#processing_event ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 125 def processing_event with_postgres do |c| raw_event = PG.single( c.exec(SELECT_ACTIVE_EVENT, [active_event_key]) ) unpack(raw_event) end end |
#push(event) ⇒ Object Also known as: <<
72 73 74 75 76 77 78 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 72 def push(event) with_postgres do |c| c.exec(PUSH_EVENT, [events_queue_key, pack(event)]) end self end |
#to_a ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 60 def to_a with_postgres do |c| c.exec(SELECT_ALL_EVENTS, [events_queue_key]).to_a.map! do |row| unpack(row.values.first) end end end |
#unshift(event) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 84 def unshift(event) with_postgres do |c| c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)]) end self end |
#with_postgres(&block) ⇒ Object
37 38 39 |
# File 'lib/dirty_pipeline/pg/queue.rb', line 37 def with_postgres(&block) DirtyPipeline.with_postgres(&block) end |