Module: QC::Conn

Extended by:
Conn
Included in:
Conn
Defined in:
lib/queue_classic/conn.rb

Instance Method Summary collapse

Instance Method Details

#connectObject



71
72
73
74
75
76
77
78
79
# File 'lib/queue_classic/conn.rb', line 71

def connect
  log(:at => "establish_conn")
  conn = PGconn.connect(*normalize_db_url(db_url))
  if conn.status != PGconn::CONNECTION_OK
    log(:error => conn.error)
  end
  conn.exec("SET application_name = '#{QC::APP_NAME}'")
  conn
end

#connectionObject



52
53
54
# File 'lib/queue_classic/conn.rb', line 52

def connection
  @connection ||= connect
end

#connection=(connection) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/queue_classic/conn.rb', line 56

def connection=(connection)
  unless connection.is_a? PG::Connection
    c = connection.class
    err = "connection must be an instance of PG::Connection, but was #{c}"
    raise(ArgumentError, err)
  end
  @connection = connection
end

#db_urlObject



95
96
97
98
99
100
101
# File 'lib/queue_classic/conn.rb', line 95

def db_url
  return @db_url if @db_url
  url = ENV["QC_DATABASE_URL"] ||
        ENV["DATABASE_URL"]    ||
        raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
  @db_url = URI.parse(url)
end

#disconnectObject



65
66
67
68
69
# File 'lib/queue_classic/conn.rb', line 65

def disconnect
  begin connection.finish
  ensure @connection = nil
  end
end

#execute(stmt, *params) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/queue_classic/conn.rb', line 8

def execute(stmt, *params)
  @exec_mutex.synchronize do
    log(:at => "exec_sql", :sql => stmt.inspect)
    begin
      params = nil if params.empty?
      r = connection.exec(stmt, params)
      result = []
      r.each {|t| result << t}
      result.length > 1 ? result : result.pop
    rescue PGError => e
      log(:error => e.inspect)
      disconnect
      raise
    end
  end
end

#normalize_db_url(url) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/queue_classic/conn.rb', line 81

def normalize_db_url(url)
  host = url.host
  host = host.gsub(/%2F/i, '/') if host

  [
   host, # host or percent-encoded socket path
   url.port || 5432,
   nil, '', #opts, tty
   url.path.gsub("/",""), # database name
   url.user,
   url.password
  ]
end

#notify(chan) ⇒ Object



25
26
27
28
# File 'lib/queue_classic/conn.rb', line 25

def notify(chan)
  log(:at => "NOTIFY")
  execute('NOTIFY "' + chan + '"') #quotes matter
end

#transactionObject



37
38
39
40
41
42
43
44
45
46
# File 'lib/queue_classic/conn.rb', line 37

def transaction
  begin
    execute("BEGIN")
    yield
    execute("COMMIT")
  rescue Exception
    execute("ROLLBACK")
    raise
  end
end

#transaction_idle?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/queue_classic/conn.rb', line 48

def transaction_idle?
  connection.transaction_status == PGconn::PQTRANS_IDLE
end

#wait(chan, t) ⇒ Object



30
31
32
33
34
35
# File 'lib/queue_classic/conn.rb', line 30

def wait(chan, t)
  listen(chan)
  wait_for_notify(t)
  unlisten(chan)
  drain_notify
end