Module: QC::Conn

Extended by:
Conn
Included in:
Conn
Defined in:
lib/queue_classic/conn.rb

Instance Method Summary collapse

Instance Method Details

#connectObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/queue_classic/conn.rb', line 72

def connect
  log(:level => :debug, :action => "establish_conn")
  conn = PGconn.connect(
    db_url.host,
    db_url.port || 5432,
    nil, '', #opts, tty
    db_url.path.gsub("/",""), # database name
    db_url.user,
    db_url.password
  )
  if conn.status != PGconn::CONNECTION_OK
    log(:level => :error, :message => conn.error)
  end
  conn
end

#connectionObject



62
63
64
# File 'lib/queue_classic/conn.rb', line 62

def connection
  @connection ||= connect
end

#db_urlObject



88
89
90
91
92
93
94
# File 'lib/queue_classic/conn.rb', line 88

def db_url
  return @db_url if @db_url
  url = ENV["QC_DATABASE_URL"] ||
        ENV["DATABASE_URL"]    ||
        raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
  @db_url = URI.parse(url)
end

#disconnectObject



66
67
68
69
70
# File 'lib/queue_classic/conn.rb', line 66

def disconnect
  connection.finish
ensure
  @connection = nil
end

#drain_notifyObject



35
36
37
38
39
# File 'lib/queue_classic/conn.rb', line 35

def drain_notify
  until connection.notifies.nil?
    log(:level => :debug, :action => "drain_notifications")
  end
end

#execute(stmt, *params) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/queue_classic/conn.rb', line 5

def execute(stmt, *params)
  log(:level => :debug, :action => "exec_sql", :sql => stmt.inspect)
  begin
    params = nil if params.empty?
    r = connection.exec(stmt, params)
    result = []
    r.each {|t| result << t}
    result.length > 1 ? result : result.pop
  rescue PGError => e
    log(:error => e.inspect)
    disconnect
    raise
  end
end

#listen(chan) ⇒ Object



25
26
27
28
# File 'lib/queue_classic/conn.rb', line 25

def listen(chan)
  log(:level => :debug, :action => "LISTEN")
  execute('LISTEN "' + chan + '"') #quotes matter
end

#log(msg) ⇒ Object



96
97
98
# File 'lib/queue_classic/conn.rb', line 96

def log(msg)
  QC.log(msg)
end

#notify(chan) ⇒ Object



20
21
22
23
# File 'lib/queue_classic/conn.rb', line 20

def notify(chan)
  log(:level => :debug, :action => "NOTIFY")
  execute('NOTIFY "' + chan + '"') #quotes matter
end

#transactionObject



47
48
49
50
51
52
53
54
55
56
# File 'lib/queue_classic/conn.rb', line 47

def transaction
  begin
    execute("BEGIN")
    yield
    execute("COMMIT")
  rescue Exception
    execute("ROLLBACK")
    raise
  end
end

#transaction_idle?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/queue_classic/conn.rb', line 58

def transaction_idle?
  connection.transaction_status == PGconn::PQTRANS_IDLE
end

#unlisten(chan) ⇒ Object



30
31
32
33
# File 'lib/queue_classic/conn.rb', line 30

def unlisten(chan)
  log(:level => :debug, :action => "UNLISTEN")
  execute('UNLISTEN "' + chan + '"') #quotes matter
end

#wait_for_notify(t) ⇒ Object



41
42
43
44
45
# File 'lib/queue_classic/conn.rb', line 41

def wait_for_notify(t)
  connection.wait_for_notify(t) do |event, pid, msg|
    log(:level => :debug, :action => "received_notification")
  end
end