Module: PgQueue::PgExtensions

Defined in:
lib/pg_queue/pg_extensions.rb

Instance Method Summary collapse

Instance Method Details

#delete(table_name, where) ⇒ Object



15
16
17
18
19
# File 'lib/pg_queue/pg_extensions.rb', line 15

def delete(table_name, where)
  i = 0
  conditions = where.map { |k, v| i += 1; "#{k} = $#{i}" }.join(" AND ")
  exec("DELETE FROM #{table_name} WHERE #{conditions}", where.values)
end

#first(sql) ⇒ Object



21
22
23
24
25
# File 'lib/pg_queue/pg_extensions.rb', line 21

def first(sql)
  result = exec(sql)
  return nil unless result.count > 0
  result[0]
end

#insert(table_name, hash, returning = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
# File 'lib/pg_queue/pg_extensions.rb', line 3

def insert(table_name, hash, returning = nil)
  fields = hash.keys.join(", ")
  variables = hash.size.times.map { |n| "$#{n+1}" }.join(", ")
  sql = "INSERT INTO #{table_name} (#{fields}) VALUES (#{variables})"
  if returning
    sql << " RETURNING #{returning}"
    exec(sql, hash.values).getvalue(0, 0)
  else
    exec(sql, hash.values)
  end
end

#listen(key) ⇒ Object



33
34
35
# File 'lib/pg_queue/pg_extensions.rb', line 33

def listen(key)
  exec("LISTEN #{key}")
end

#new_connectionObject



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pg_queue/pg_extensions.rb', line 41

def new_connection
  PGconn.open(
    :host => host,
    :port => port,
    :user => user,
    :password => pass,
    :dbname => db
  ).tap do |conn|
    conn.extend(PgExtensions)
  end
end

#notify(key, message = nil) ⇒ Object



27
28
29
30
31
# File 'lib/pg_queue/pg_extensions.rb', line 27

def notify(key, message = nil)
  sql = "NOTIFY #{key}"
  sql << ", '#{message}'" if message
  exec(sql)
end

#unlisten(key) ⇒ Object



37
38
39
# File 'lib/pg_queue/pg_extensions.rb', line 37

def unlisten(key)
  exec("UNLISTEN #{key}")
end