Module: QC::Conn
Instance Method Summary collapse
- #connect ⇒ Object
- #connection ⇒ Object
- #db_url ⇒ Object
- #disconnect ⇒ Object
- #drain_notify ⇒ Object
- #execute(stmt, *params) ⇒ Object
- #listen(chan) ⇒ Object
- #log(msg) ⇒ Object
- #notify(chan) ⇒ Object
- #transaction ⇒ Object
- #transaction_idle? ⇒ Boolean
- #unlisten(chan) ⇒ Object
- #wait_for_notify(t) ⇒ Object
Instance Method Details
#connect ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/queue_classic/conn.rb', line 72 def connect log(:level => :debug, :action => "establish_conn") conn = PGconn.connect( db_url.host, db_url.port || 5432, nil, '', #opts, tty db_url.path.gsub("/",""), # database name db_url.user, db_url.password ) if conn.status != PGconn::CONNECTION_OK log(:level => :error, :message => conn.error) end conn end |
#connection ⇒ Object
62 63 64 |
# File 'lib/queue_classic/conn.rb', line 62 def connection @connection ||= connect end |
#db_url ⇒ Object
88 89 90 91 92 93 94 |
# File 'lib/queue_classic/conn.rb', line 88 def db_url return @db_url if @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end |
#disconnect ⇒ Object
66 67 68 69 70 |
# File 'lib/queue_classic/conn.rb', line 66 def disconnect connection.finish ensure @connection = nil end |
#drain_notify ⇒ Object
35 36 37 38 39 |
# File 'lib/queue_classic/conn.rb', line 35 def drain_notify until connection.notifies.nil? log(:level => :debug, :action => "drain_notifications") end end |
#execute(stmt, *params) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/queue_classic/conn.rb', line 5 def execute(stmt, *params) log(:level => :debug, :action => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PGError => e log(:error => e.inspect) disconnect raise end end |
#listen(chan) ⇒ Object
25 26 27 28 |
# File 'lib/queue_classic/conn.rb', line 25 def listen(chan) log(:level => :debug, :action => "LISTEN") execute('LISTEN "' + chan + '"') #quotes matter end |
#log(msg) ⇒ Object
96 97 98 |
# File 'lib/queue_classic/conn.rb', line 96 def log(msg) QC.log(msg) end |
#notify(chan) ⇒ Object
20 21 22 23 |
# File 'lib/queue_classic/conn.rb', line 20 def notify(chan) log(:level => :debug, :action => "NOTIFY") execute('NOTIFY "' + chan + '"') #quotes matter end |
#transaction ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/queue_classic/conn.rb', line 47 def transaction begin execute("BEGIN") yield execute("COMMIT") rescue Exception execute("ROLLBACK") raise end end |
#transaction_idle? ⇒ Boolean
58 59 60 |
# File 'lib/queue_classic/conn.rb', line 58 def transaction_idle? connection.transaction_status == PGconn::PQTRANS_IDLE end |
#unlisten(chan) ⇒ Object
30 31 32 33 |
# File 'lib/queue_classic/conn.rb', line 30 def unlisten(chan) log(:level => :debug, :action => "UNLISTEN") execute('UNLISTEN "' + chan + '"') #quotes matter end |
#wait_for_notify(t) ⇒ Object
41 42 43 44 45 |
# File 'lib/queue_classic/conn.rb', line 41 def wait_for_notify(t) connection.wait_for_notify(t) do |event, pid, msg| log(:level => :debug, :action => "received_notification") end end |