Class: Rake::DataTask::Postgres

Inherits:
Db
  • Object
show all
Extended by:
ConnectionPersistence
Includes:
StandardBooleans, StandardTransactions
Defined in:
lib/data_task/adapters/postgres.rb

Direct Known Subclasses

Greenplum

Constant Summary

Constants inherited from Db

Db::LOG, Db::TABLE_TRACKER_COLUMNS, Db::TABLE_TRACKER_NAME

Instance Method Summary collapse

Methods included from ConnectionPersistence

persist_connection, persisted_connection

Methods inherited from Db

#operation_values, #relation_type_values

Constructor Details

#initialize(options) ⇒ Sqlite

Connect to a PostgreSQL database.

If we’ve already used this class to connect to the same host, port, and database with the same username, re-use that connection for this instance.

Parameters:

  • options (Hash)

    the connection parameters

Options Hash (options):

  • 'host' (String)

    the server hostname or IP address

  • 'port' (Integer)

    the server port number

  • 'database' (String)

    the database name

  • 'username' (String)

    the name of the database user to connect as

  • 'password' (String)

    the database user’s password



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/data_task/adapters/postgres.rb', line 30

def initialize options
  host = options['host'] || 'localhost'
  port = options['port'] || 5432
  database = options['database']
  username = options['username']

  # always reuse an existing connection if it matches on these connection options
  conn_options = {:host => host, :port => port, :database => database, :username => username}
  existing_connection = self.class.persisted_connection(conn_options)

  if existing_connection.nil?
    # create and persist a new connection
    @connection = PG::Connection.new(
      host,
      port,
      nil,
      nil,
      database,
      username,
      options['password'] || ''
    )
    @connection.set_notice_processor do |msg|
      if msg =~ /^ERROR:/
        LOG.error('psql') { msg.gsub(/\n/,'; ') }
      else
        LOG.info('psql') { msg.gsub(/\n/,'; ') }
      end
    end
    self.class.persist_connection(@connection, conn_options)
  else
    # reuse an existing connection
    @connection = existing_connection
  end

  # set up trackig if it isn't set up already
  set_up_tracking if !tracking_tables?
end

Instance Method Details

#[](name) ⇒ Object



68
69
70
# File 'lib/data_task/adapters/postgres.rb', line 68

def [](name)
  Data.new(name, self)
end

#create_table(table_name, data_definition, column_definitions, track_table = true) ⇒ Object Also known as: create_data



204
205
206
207
208
209
210
211
212
213
214
# File 'lib/data_task/adapters/postgres.rb', line 204

def create_table table_name, data_definition, column_definitions, track_table=true
  drop_table table_name
  execute <<-EOSQL
    create table #{table_name} #{column_definitions}
    #{ "as #{data_definition}" if !data_definition.nil? }
  EOSQL
  if track_table
    create_tracking_rules(table_name)
    track_creation table_name, 0
  end
end

#create_view(view_name, view_definition) ⇒ Object



218
219
220
221
222
223
224
# File 'lib/data_task/adapters/postgres.rb', line 218

def create_view view_name, view_definition
  drop_view view_name
  execute <<-EOSQL
    create view #{view_name} as
    #{view_definition}
  EOSQL
end

#drop_table(table_name) ⇒ Object Also known as: drop_data



172
173
174
175
176
# File 'lib/data_task/adapters/postgres.rb', line 172

def drop_table table_name
  execute "drop table if exists #{table_name} cascade"
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  track_drop table_name
end

#drop_view(view_name) ⇒ Object



226
227
228
# File 'lib/data_task/adapters/postgres.rb', line 226

def drop_view view_name
  execute "drop view if exists #{view_name} cascade"
end

#execute(sql) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/data_task/adapters/postgres.rb', line 90

def execute sql
  connect if @connection.nil?

  begin

    r = @connection.exec sql
    r.values

  rescue PG::UndefinedTable => e

    if /ERROR:  relation "(last_operations|.*\.last_operations)" does not exist/ =~ e.message
      LOG.error "Tracking is not set up in this schema. Set up tracking in this schema first."
    end
    execute "rollback;"
    raise e

  rescue PGError => e

    LOG.info e.message.chomp
    execute "rollback;"
    raise e

  end
end

#operations_supportedObject



230
231
232
233
234
235
# File 'lib/data_task/adapters/postgres.rb', line 230

def operations_supported
  {
    :by_db => operations_supported_by_db,
    :by_app => [:truncate, :create] - operations_supported_by_db
  }
end

#reset_tracking(options = {}) ⇒ Object



140
141
142
143
144
145
# File 'lib/data_task/adapters/postgres.rb', line 140

def reset_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    truncate_table TABLE_TRACKER_NAME
  end
end

#set_up_tracking(options = {}) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/data_task/adapters/postgres.rb', line 119

def set_up_tracking options = {}
  tear_down_tracking options

  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do

    column_definitions = table_tracker_columns.map do |col, col_defn|
      col.to_s + ' ' + col_defn[:data_type].to_s
    end.join(', ')
    create_table TABLE_TRACKER_NAME, nil, " (#{column_definitions})", false

  end
end

#table_exists?(table_name, options = {}) ⇒ Boolean Also known as: data_exists?

Returns:

  • (Boolean)


194
195
196
# File 'lib/data_task/adapters/postgres.rb', line 194

def table_exists? table_name, options = {}
  relation_exists? table_name, :table, options
end

#table_mtime(qualified_table_name) ⇒ Object Also known as: data_mtime



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/data_task/adapters/postgres.rb', line 147

def table_mtime qualified_table_name
  schema_name, table_name = parse_schema_and_table_name(qualified_table_name)
  schema_name = first_schema_for(table_name) if schema_name.nil?

  with_search_path(schema_name) do
    Sql.get_single_time(
      execute <<-EOSQL
        select max(time)
        from #{schema_name}.#{TABLE_TRACKER_NAME}
        where relation_name = '#{table_name}'
      EOSQL
    )
  end
end

#table_tracker_columnsObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/data_task/adapters/postgres.rb', line 72

def table_tracker_columns
  # upcase all enum'd column values because system tables store them in upcase
  cols = super
  cols.each do |k1,v1|
    cols[k1].each do |k2, v2|
      if k2 == :values
        cols[k1][k2].each do |k3, v3|
          cols[k1][k2][k3] = v3.upcase
        end
      end
    end
  end

  cols[:relation_type][:values][:table] = 'BASE TABLE'
  cols[:time][:data_type] = :'timestamp with time zone'
  cols
end

#tear_down_tracking(options = {}) ⇒ Object



133
134
135
136
137
138
# File 'lib/data_task/adapters/postgres.rb', line 133

def tear_down_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    drop_table TABLE_TRACKER_NAME
  end
end

#track_drop(table_name) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/data_task/adapters/postgres.rb', line 178

def track_drop table_name
  schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
  table_tracker_name = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

  if table_exists?(table_tracker_name)
    execute <<-EOSQL
      delete from #{table_tracker_name}
      where
        relation_name = '#{unqualified_table_name}' and 
        relation_type = '#{relation_type_values[:table]}'
    EOSQL
  end
end

#tracking_tables?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/data_task/adapters/postgres.rb', line 115

def tracking_tables?
  data_exists?(TABLE_TRACKER_NAME)
end

#truncate_table(table_name) ⇒ Object Also known as: truncate_data



164
165
166
167
168
# File 'lib/data_task/adapters/postgres.rb', line 164

def truncate_table table_name
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  execute "truncate table #{table_name}"
  track_truncate table_name
end

#view_exists?(view_name, options = {}) ⇒ Boolean

Returns:

  • (Boolean)


200
201
202
# File 'lib/data_task/adapters/postgres.rb', line 200

def view_exists? view_name, options = {}
  relation_exists? view_name, :view, options
end

#with_role(role) ⇒ Object



245
246
247
248
249
250
251
# File 'lib/data_task/adapters/postgres.rb', line 245

def with_role role
  original_role = current_user
  execute "set role #{role}"
  r = yield
  execute "set role #{original_role}"
  r
end

#with_search_path(schemas) ⇒ Object



237
238
239
240
241
242
243
# File 'lib/data_task/adapters/postgres.rb', line 237

def with_search_path schemas
  original_search_path = search_path
  execute "set search_path to #{Array(schemas).join(',')}"
  r = yield
  execute "set search_path to #{original_search_path.join(',')}"
  r
end