Module: QC::Conn
Instance Method Summary collapse
- #connect ⇒ Object
- #connection ⇒ Object
- #connection=(connection) ⇒ Object
- #db_url ⇒ Object
- #disconnect ⇒ Object
- #execute(stmt, *params) ⇒ Object
- #normalize_db_url(url) ⇒ Object
- #notify(chan) ⇒ Object
- #transaction ⇒ Object
- #transaction_idle? ⇒ Boolean
- #wait(chan, t) ⇒ Object
Instance Method Details
#connect ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/queue_classic/conn.rb', line 71 def connect log(:at => "establish_conn") conn = PGconn.connect(*normalize_db_url(db_url)) if conn.status != PGconn::CONNECTION_OK log(:error => conn.error) end conn.exec("SET application_name = '#{QC::APP_NAME}'") conn end |
#connection ⇒ Object
52 53 54 |
# File 'lib/queue_classic/conn.rb', line 52 def connection @connection ||= connect end |
#connection=(connection) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/queue_classic/conn.rb', line 56 def connection=(connection) unless connection.is_a? PG::Connection c = connection.class err = "connection must be an instance of PG::Connection, but was #{c}" raise(ArgumentError, err) end @connection = connection end |
#db_url ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/queue_classic/conn.rb', line 95 def db_url return @db_url if @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end |
#disconnect ⇒ Object
65 66 67 68 69 |
# File 'lib/queue_classic/conn.rb', line 65 def disconnect begin connection.finish ensure @connection = nil end end |
#execute(stmt, *params) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/queue_classic/conn.rb', line 8 def execute(stmt, *params) @exec_mutex.synchronize do log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PGError => e log(:error => e.inspect) disconnect raise end end end |
#normalize_db_url(url) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/queue_classic/conn.rb', line 81 def normalize_db_url(url) host = url.host host = host.gsub(/%2F/i, '/') if host [ host, # host or percent-encoded socket path url.port || 5432, nil, '', #opts, tty url.path.gsub("/",""), # database name url.user, url.password ] end |
#notify(chan) ⇒ Object
25 26 27 28 |
# File 'lib/queue_classic/conn.rb', line 25 def notify(chan) log(:at => "NOTIFY") execute('NOTIFY "' + chan + '"') #quotes matter end |
#transaction ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/queue_classic/conn.rb', line 37 def transaction begin execute("BEGIN") yield execute("COMMIT") rescue Exception execute("ROLLBACK") raise end end |
#transaction_idle? ⇒ Boolean
48 49 50 |
# File 'lib/queue_classic/conn.rb', line 48 def transaction_idle? connection.transaction_status == PGconn::PQTRANS_IDLE end |
#wait(chan, t) ⇒ Object
30 31 32 33 34 35 |
# File 'lib/queue_classic/conn.rb', line 30 def wait(chan, t) listen(chan) wait_for_notify(t) unlisten(chan) drain_notify end |