Class: DirtyPipeline::PG::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/dirty_pipeline/pg/queue.rb

Constant Summary collapse

DELETE_ACTIVE =
<<~SQL
  DELETE FROM dp_active_events WHERE key = $1;
SQL
DELETE_EVENTS =
<<~SQL
  DELETE FROM dp_event_queues WHERE key = $1;
SQL
SELECT_ALL_EVENTS =
<<~SQL
  SELECT payload FROM dp_event_queues WHERE key = $1 ORDER BY id DESC;
SQL
PUSH_EVENT =
<<~SQL
  INSERT INTO dp_event_queues (id, key, payload)
  VALUES (-nextval('dp_event_queues_id_seq'), $1, $2);
SQL
UNSHIFT_EVENT =
<<~SQL
  INSERT INTO dp_event_queues (key, payload) VALUES ($1, $2);
SQL
SELECT_LAST_EVENT =
<<~SQL
  SELECT id, payload FROM dp_event_queues
  WHERE key = $1
  ORDER BY id DESC LIMIT 1;
SQL
DELETE_EVENT =
<<~SQL
  DELETE FROM dp_event_queues WHERE key = $1 AND id = $2;
SQL
DELETE_ACTIVE_EVENT =
<<~SQL
  DELETE FROM dp_active_events WHERE key = $1;
SQL
SET_EVENT_ACTIVE =
<<~SQL
  INSERT INTO dp_active_events (key, payload) VALUES ($1, $2)
  ON CONFLICT (key) DO UPDATE SET payload = EXCLUDED.payload;
SQL
SELECT_ACTIVE_EVENT =
<<~SQL
  SELECT payload FROM dp_active_events WHERE key = $1;
SQL

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(operation, subject_class, subject_id, transaction_id) ⇒ Queue

Returns a new instance of Queue.



32
33
34
35
# File 'lib/dirty_pipeline/pg/queue.rb', line 32

def initialize(operation, subject_class, subject_id, transaction_id)
  @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
          "op_#{operation}:txid_#{transaction_id}"
end

Class Method Details

.create!(connection) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/dirty_pipeline/pg/queue.rb', line 6

def self.create!(connection)
  connection.exec <<~SQL
    CREATE TABLE dp_active_events (
      key TEXT CONSTRAINT primary_event_queues_key PRIMARY KEY,
      payload TEXT,
      created_at TIMESTAMP NOT NULL DEFAULT now()
    );

    CREATE SEQUENCE dp_event_queues_id_seq START 1;
    CREATE TABLE dp_event_queues (
      id BIGINT PRIMARY KEY DEFAULT nextval('dp_event_queues_id_seq'),
      key TEXT NOT NULL,
      payload TEXT,
      created_at TIMESTAMP NOT NULL DEFAULT now()
    );
  SQL
end

.destroy!(connection) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/dirty_pipeline/pg/queue.rb', line 24

def self.destroy!(connection)
  connection.exec <<~SQL
    DROP TABLE IF EXISTS dp_active_events;
    DROP TABLE IF EXISTS dp_event_queues;
    DROP SEQUENCE IF EXISTS dp_event_queues_id_seq;
  SQL
end

Instance Method Details

#clear!Object



48
49
50
51
52
53
54
55
# File 'lib/dirty_pipeline/pg/queue.rb', line 48

def clear!
  with_postgres do |c|
    c.transaction do |tc|
      tc.exec(DELETE_ACTIVE, [active_event_key])
      tc.exec(DELETE_EVENTS, [events_queue_key])
    end
  end
end

#popObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/dirty_pipeline/pg/queue.rb', line 106

def pop
  with_postgres do |c|
    c.transaction do |tc|
      event_id, raw_event =
        PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key]))
      if raw_event.nil?
        tc.exec(DELETE_ACTIVE_EVENT, [active_event_key])
      else
        tc.exec(DELETE_EVENT, [events_queue_key, event_id])
        tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event])
      end
      unpack(raw_event)
    end
  end
end

#processing_eventObject



125
126
127
128
129
130
131
132
# File 'lib/dirty_pipeline/pg/queue.rb', line 125

def processing_event
  with_postgres do |c|
    raw_event = PG.single(
      c.exec(SELECT_ACTIVE_EVENT, [active_event_key])
    )
    unpack(raw_event)
  end
end

#push(event) ⇒ Object Also known as: <<



72
73
74
75
76
77
78
# File 'lib/dirty_pipeline/pg/queue.rb', line 72

def push(event)
  with_postgres do |c|
    c.exec(PUSH_EVENT, [events_queue_key, pack(event)])
  end

  self
end

#to_aObject



60
61
62
63
64
65
66
# File 'lib/dirty_pipeline/pg/queue.rb', line 60

def to_a
  with_postgres do |c|
    c.exec(SELECT_ALL_EVENTS, [events_queue_key]).to_a.map! do |row|
      unpack(row.values.first)
    end
  end
end

#unshift(event) ⇒ Object



84
85
86
87
88
89
# File 'lib/dirty_pipeline/pg/queue.rb', line 84

def unshift(event)
  with_postgres do |c|
    c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)])
  end
  self
end

#with_postgres(&block) ⇒ Object



37
38
39
# File 'lib/dirty_pipeline/pg/queue.rb', line 37

def with_postgres(&block)
  DirtyPipeline.with_postgres(&block)
end