Module: QC::Conn
Instance Method Summary collapse
- #connect ⇒ Object
- #connection ⇒ Object
- #connection=(connection) ⇒ Object
- #db_url ⇒ Object
- #disconnect ⇒ Object
- #drain_notify ⇒ Object
- #execute(stmt, *params) ⇒ Object
- #listen(chan) ⇒ Object
- #log(msg) ⇒ Object
- #notify(chan) ⇒ Object
- #transaction ⇒ Object
- #transaction_idle? ⇒ Boolean
- #unlisten(chan) ⇒ Object
- #wait_for_notify(t) ⇒ Object
Instance Method Details
#connect ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/queue_classic/conn.rb', line 86 def connect log(:at => "establish_conn") conn = PGconn.connect( db_url.host.gsub(/%2F/i, '/'), # host or percent-encoded socket path db_url.port || 5432, nil, '', #opts, tty db_url.path.gsub("/",""), # database name db_url.user, db_url.password ) if conn.status != PGconn::CONNECTION_OK log(:error => conn.error) end conn end |
#connection ⇒ Object
67 68 69 |
# File 'lib/queue_classic/conn.rb', line 67 def connection @connection ||= connect end |
#connection=(connection) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/queue_classic/conn.rb', line 71 def connection=(connection) unless connection.instance_of? PG::Connection c = connection.class err = "connection must be an instance of PG::Connection, but was #{c}" raise(ArgumentError, err) end @connection = connection end |
#db_url ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/queue_classic/conn.rb', line 102 def db_url return @db_url if @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end |
#disconnect ⇒ Object
80 81 82 83 84 |
# File 'lib/queue_classic/conn.rb', line 80 def disconnect begin connection.finish ensure @connection = nil end end |
#drain_notify ⇒ Object
40 41 42 43 44 |
# File 'lib/queue_classic/conn.rb', line 40 def drain_notify until connection.notifies.nil? log(:at => "drain_notifications") end end |
#execute(stmt, *params) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/queue_classic/conn.rb', line 8 def execute(stmt, *params) @exec_mutex.synchronize do log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PGError => e log(:error => e.inspect) disconnect raise end end end |
#listen(chan) ⇒ Object
30 31 32 33 |
# File 'lib/queue_classic/conn.rb', line 30 def listen(chan) log(:at => "LISTEN") execute('LISTEN "' + chan + '"') #quotes matter end |
#log(msg) ⇒ Object
110 111 112 |
# File 'lib/queue_classic/conn.rb', line 110 def log(msg) QC.log(msg) end |
#notify(chan) ⇒ Object
25 26 27 28 |
# File 'lib/queue_classic/conn.rb', line 25 def notify(chan) log(:at => "NOTIFY") execute('NOTIFY "' + chan + '"') #quotes matter end |
#transaction ⇒ Object
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/queue_classic/conn.rb', line 52 def transaction begin execute("BEGIN") yield execute("COMMIT") rescue Exception execute("ROLLBACK") raise end end |
#transaction_idle? ⇒ Boolean
63 64 65 |
# File 'lib/queue_classic/conn.rb', line 63 def transaction_idle? connection.transaction_status == PGconn::PQTRANS_IDLE end |
#unlisten(chan) ⇒ Object
35 36 37 38 |
# File 'lib/queue_classic/conn.rb', line 35 def unlisten(chan) log(:at => "UNLISTEN") execute('UNLISTEN "' + chan + '"') #quotes matter end |
#wait_for_notify(t) ⇒ Object
46 47 48 49 50 |
# File 'lib/queue_classic/conn.rb', line 46 def wait_for_notify(t) connection.wait_for_notify(t) do |event, pid, msg| log(:at => "received_notification") end end |