Class: DirtyPipeline::PG::Railway
- Inherits:
-
Object
- Object
- DirtyPipeline::PG::Railway
- Defined in:
- lib/dirty_pipeline/pg/railway.rb
Constant Summary collapse
- DEFAULT_OPERATIONS =
%w(call undo finalize finalize_undo)
- DELETE_OPERATION =
<<~SQL DELETE FROM dp_active_operations WHERE key = $1; SQL
- DELETE_TRANSACTION =
<<~SQL DELETE FROM dp_active_transactions WHERE key = $1; SQL
- SWITCH_OPERATION =
<<~SQL INSERT INTO dp_active_operations (key, name) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET name = EXCLUDED.name; SQL
- SELECT_OPERATION =
<<~SQL SELECT name FROM dp_active_operations WHERE key = $1; SQL
- SELECT_TRANSACTION =
<<~SQL SELECT name FROM dp_active_transactions WHERE key = $1; SQL
Class Method Summary collapse
Instance Method Summary collapse
- #active ⇒ Object (also: #operation)
- #clear! ⇒ Object
-
#initialize(subject, transaction_id) ⇒ Railway
constructor
A new instance of Railway.
- #next ⇒ Object
- #other_transaction_in_progress? ⇒ Boolean
- #queue(operation_name = active) ⇒ Object (also: #[])
- #running_transaction ⇒ Object
- #switch_to(name) ⇒ Object
- #with_postgres(&block) ⇒ Object
Constructor Details
#initialize(subject, transaction_id) ⇒ Railway
Returns a new instance of Railway.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 28 def initialize(subject, transaction_id) @tx_id = transaction_id @subject_class = subject.class.to_s @subject_id = subject.id.to_s @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:" @queues = Hash[ DEFAULT_OPERATIONS.map do |operation| [operation, create_queue(operation)] end ] end |
Class Method Details
.create!(connection) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 6 def self.create!(connection) connection.exec <<~SQL CREATE TABLE dp_active_operations ( key TEXT CONSTRAINT primary_dp_active_operations_key PRIMARY KEY, name TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE TABLE dp_active_transactions ( key TEXT CONSTRAINT primary_dp_active_tx_key PRIMARY KEY, name TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); SQL end |
.destroy!(connection) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 21 def self.destroy!(connection) connection.exec <<~SQL DROP TABLE IF EXISTS dp_active_operations; DROP TABLE IF EXISTS dp_active_transactions; SQL end |
Instance Method Details
#active ⇒ Object Also known as: operation
93 94 95 96 97 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 93 def active with_postgres do |c| PG.single c.exec(SELECT_OPERATION, [active_operation_key]) end end |
#clear! ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 50 def clear! @queues.values.each(&:clear!) with_postgres do |c| c.transaction do |tc| tc.exec DELETE_OPERATION, [active_operation_key] tc.exec DELETE_TRANSACTION, [active_transaction_key] end end end |
#next ⇒ Object
60 61 62 63 64 65 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 60 def next return if other_transaction_in_progress? start_transaction! unless running_transaction queue.pop.tap { |event| finish_transaction! if event.nil? } end |
#other_transaction_in_progress? ⇒ Boolean
109 110 111 112 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 109 def other_transaction_in_progress? return false if running_transaction.nil? running_transaction != @tx_id end |
#queue(operation_name = active) ⇒ Object Also known as: []
67 68 69 70 71 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 67 def queue(operation_name = active) @queues.fetch(operation_name.to_s) do @queues.store(operation_name, create_queue(operation_name)) end end |
#running_transaction ⇒ Object
103 104 105 106 107 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 103 def running_transaction with_postgres do |c| PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key]) end end |
#switch_to(name) ⇒ Object
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 79 def switch_to(name) raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s) return if name.to_s == active with_postgres do |c| # c.exec('START TRANSACTION;') c.exec(SWITCH_OPERATION, [active_operation_key, name]) # c.exec('COMMIT;') end end |
#with_postgres(&block) ⇒ Object
40 41 42 |
# File 'lib/dirty_pipeline/pg/railway.rb', line 40 def with_postgres(&block) DirtyPipeline.with_postgres(&block) end |