Class: DirtyPipeline::PG::Railway

Inherits:
Object
  • Object
show all
Defined in:
lib/dirty_pipeline/pg/railway.rb

Constant Summary collapse

DEFAULT_OPERATIONS =
%w(call undo finalize finalize_undo)
DELETE_OPERATION =
<<~SQL
  DELETE FROM dp_active_operations WHERE key = $1;
SQL
DELETE_TRANSACTION =
<<~SQL
  DELETE FROM dp_active_transactions WHERE key = $1;
SQL
SWITCH_OPERATION =
<<~SQL
  INSERT INTO dp_active_operations (key, name) VALUES ($1, $2)
  ON CONFLICT (key)
  DO UPDATE SET name = EXCLUDED.name;
SQL
SELECT_OPERATION =
<<~SQL
  SELECT name FROM dp_active_operations WHERE key = $1;
SQL
SELECT_TRANSACTION =
<<~SQL
  SELECT name FROM dp_active_transactions WHERE key = $1;
SQL

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject, transaction_id) ⇒ Railway

Returns a new instance of Railway.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/dirty_pipeline/pg/railway.rb', line 28

def initialize(subject, transaction_id)
  @tx_id = transaction_id
  @subject_class = subject.class.to_s
  @subject_id = subject.id.to_s
  @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:"
  @queues = Hash[
    DEFAULT_OPERATIONS.map do |operation|
      [operation, create_queue(operation)]
    end
  ]
end

Class Method Details

.create!(connection) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/dirty_pipeline/pg/railway.rb', line 6

def self.create!(connection)
  connection.exec <<~SQL
    CREATE TABLE dp_active_operations (
      key TEXT CONSTRAINT primary_dp_active_operations_key PRIMARY KEY,
      name TEXT,
      created_at TIMESTAMP NOT NULL DEFAULT now()
    );
    CREATE TABLE dp_active_transactions (
      key TEXT CONSTRAINT primary_dp_active_tx_key PRIMARY KEY,
      name TEXT,
      created_at TIMESTAMP NOT NULL DEFAULT now()
    );
  SQL
end

.destroy!(connection) ⇒ Object



21
22
23
24
25
26
# File 'lib/dirty_pipeline/pg/railway.rb', line 21

def self.destroy!(connection)
  connection.exec <<~SQL
    DROP TABLE IF EXISTS dp_active_operations;
    DROP TABLE IF EXISTS dp_active_transactions;
  SQL
end

Instance Method Details

#activeObject Also known as: operation



93
94
95
96
97
# File 'lib/dirty_pipeline/pg/railway.rb', line 93

def active
  with_postgres do |c|
    PG.single c.exec(SELECT_OPERATION, [active_operation_key])
  end
end

#clear!Object



50
51
52
53
54
55
56
57
58
# File 'lib/dirty_pipeline/pg/railway.rb', line 50

def clear!
  @queues.values.each(&:clear!)
  with_postgres do |c|
    c.transaction do |tc|
      tc.exec DELETE_OPERATION, [active_operation_key]
      tc.exec DELETE_TRANSACTION, [active_transaction_key]
    end
  end
end

#nextObject



60
61
62
63
64
65
# File 'lib/dirty_pipeline/pg/railway.rb', line 60

def next
  return if other_transaction_in_progress?
  start_transaction! unless running_transaction

  queue.pop.tap { |event| finish_transaction! if event.nil? }
end

#other_transaction_in_progress?Boolean

Returns:

  • (Boolean)


109
110
111
112
# File 'lib/dirty_pipeline/pg/railway.rb', line 109

def other_transaction_in_progress?
  return false if running_transaction.nil?
  running_transaction != @tx_id
end

#queue(operation_name = active) ⇒ Object Also known as: []



67
68
69
70
71
# File 'lib/dirty_pipeline/pg/railway.rb', line 67

def queue(operation_name = active)
  @queues.fetch(operation_name.to_s) do
    @queues.store(operation_name, create_queue(operation_name))
  end
end

#running_transactionObject



103
104
105
106
107
# File 'lib/dirty_pipeline/pg/railway.rb', line 103

def running_transaction
  with_postgres do |c|
    PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key])
  end
end

#switch_to(name) ⇒ Object

Raises:

  • (ArgumentError)


79
80
81
82
83
84
85
86
87
88
# File 'lib/dirty_pipeline/pg/railway.rb', line 79

def switch_to(name)
  raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s)
  return if name.to_s == active

  with_postgres do |c|
    # c.exec('START TRANSACTION;')
    c.exec(SWITCH_OPERATION, [active_operation_key, name])
    # c.exec('COMMIT;')
  end
end

#with_postgres(&block) ⇒ Object



40
41
42
# File 'lib/dirty_pipeline/pg/railway.rb', line 40

def with_postgres(&block)
  DirtyPipeline.with_postgres(&block)
end