Class: Rake::DataTask::Postgres

Inherits:
Db
  • Object
show all
Extended by:
ConnectionPersistence
Includes:
StandardBooleans, StandardTransactions
Defined in:
lib/data_task/adapters/postgres.rb

Direct Known Subclasses

Greenplum

Constant Summary

Constants inherited from Db

Db::LOG, Db::TABLE_TRACKER_COLUMNS, Db::TABLE_TRACKER_NAME

Instance Method Summary collapse

Methods included from ConnectionPersistence

persist_connection, persisted_connection

Methods inherited from Db

#operation_values, #relation_type_values

Constructor Details

#initialize(options) ⇒ Sqlite

Connect to a PostgreSQL database.

If we’ve already used this class to connect to the same host, port, and database with the same username, re-use that connection for this instance.

Parameters:

  • options (Hash)

    the connection parameters

Options Hash (options):

  • 'host' (String)

    the server hostname or IP address

  • 'port' (Integer)

    the server port number

  • 'database' (String)

    the database name

  • 'username' (String)

    the name of the database user to connect as

  • 'password' (String)

    the database user’s password



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/data_task/adapters/postgres.rb', line 34

def initialize options
  host = options['host'] || 'localhost'
  port = options['port'] || 5432
  database = options['database']
  username = options['username']

  # always reuse an existing connection if it matches on these connection options
  conn_options = {:host => host, :port => port, :database => database, :username => username}
  existing_connection = self.class.persisted_connection(conn_options)

  if existing_connection.nil?
    # create and persist a new connection
    @connection = PG::Connection.new(
      host,
      port,
      nil,
      nil,
      database,
      username,
      options['password'] || ''
    )
    @connection.set_notice_processor do |msg|
      if msg =~ /^ERROR:/
        LOG.error('psql') { msg.gsub(/\n/,'; ') }
      else
        LOG.info('psql') { msg.gsub(/\n/,'; ') }
      end
    end
    self.class.persist_connection(@connection, conn_options)
  else
    # reuse an existing connection
    @connection = existing_connection
  end

  # set up trackig if it isn't set up already
  set_up_tracking if !tracking_tables?
end

Instance Method Details

#[](name) ⇒ Object



72
73
74
# File 'lib/data_task/adapters/postgres.rb', line 72

def [](name)
  Data.new(name, self)
end

#create_table(table_name, data_definition, column_definitions, track_table = true) ⇒ Object Also known as: create_data



208
209
210
211
212
213
214
215
216
217
218
# File 'lib/data_task/adapters/postgres.rb', line 208

def create_table table_name, data_definition, column_definitions, track_table=true
  drop_table table_name
  execute "    create table \#{table_name} \#{column_definitions}\n    \#{ \"as \#{data_definition}\" if !data_definition.nil? }\n  EOSQL\n  if track_table\n    create_tracking_rules(table_name)\n    track_creation table_name, 0\n  end\nend\n"

#create_view(view_name, view_definition) ⇒ Object



222
223
224
225
226
227
228
# File 'lib/data_task/adapters/postgres.rb', line 222

def create_view view_name, view_definition
  drop_view view_name
  execute "    create view \#{view_name} as\n    \#{view_definition}\n  EOSQL\nend\n"

#drop_table(table_name) ⇒ Object Also known as: drop_data



176
177
178
179
180
# File 'lib/data_task/adapters/postgres.rb', line 176

def drop_table table_name
  execute "drop table if exists #{table_name} cascade"
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  track_drop table_name
end

#drop_view(view_name) ⇒ Object



230
231
232
# File 'lib/data_task/adapters/postgres.rb', line 230

def drop_view view_name
  execute "drop view if exists #{view_name} cascade"
end

#execute(sql) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/data_task/adapters/postgres.rb', line 94

def execute sql
  connect if @connection.nil?

  begin

    r = @connection.exec sql
    r.values

  rescue PG::UndefinedTable => e

    if /ERROR:  relation "(last_operations|.*\.last_operations)" does not exist/ =~ e.message
      LOG.error "Tracking is not set up in this schema. Set up tracking in this schema first."
    end
    execute "rollback;"
    raise e

  rescue PGError => e

    LOG.info e.message.chomp
    execute "rollback;"
    raise e

  end
end

#operations_supportedObject



234
235
236
237
238
239
# File 'lib/data_task/adapters/postgres.rb', line 234

def operations_supported
  {
    :by_db => operations_supported_by_db,
    :by_app => [:truncate, :create] - operations_supported_by_db
  }
end

#reset_tracking(options = {}) ⇒ Object



144
145
146
147
148
149
# File 'lib/data_task/adapters/postgres.rb', line 144

def reset_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    truncate_table TABLE_TRACKER_NAME
  end
end

#set_up_tracking(options = {}) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/data_task/adapters/postgres.rb', line 123

def set_up_tracking options = {}
  tear_down_tracking options

  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do

    column_definitions = table_tracker_columns.map do |col, col_defn|
      col.to_s + ' ' + col_defn[:data_type].to_s
    end.join(', ')
    create_table TABLE_TRACKER_NAME, nil, " (#{column_definitions})", false

  end
end

#table_exists?(table_name, options = {}) ⇒ Boolean Also known as: data_exists?

Returns:

  • (Boolean)


198
199
200
# File 'lib/data_task/adapters/postgres.rb', line 198

def table_exists? table_name, options = {}
  relation_exists? table_name, :table, options
end

#table_mtime(qualified_table_name) ⇒ Object Also known as: data_mtime



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/data_task/adapters/postgres.rb', line 151

def table_mtime qualified_table_name
  schema_name, table_name = parse_schema_and_table_name(qualified_table_name)
  schema_name = first_schema_for(table_name) if schema_name.nil?

  with_search_path(schema_name) do
    Sql.get_single_time(
      execute "        select max(time)\n        from \#{schema_name}.\#{TABLE_TRACKER_NAME}\n        where relation_name = '\#{table_name}'\n      EOSQL\n    )\n  end\nend\n"

#table_tracker_columnsObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/data_task/adapters/postgres.rb', line 76

def table_tracker_columns
  # upcase all enum'd column values because system tables store them in upcase
  cols = super
  cols.each do |k1,v1|
    cols[k1].each do |k2, v2|
      if k2 == :values
        cols[k1][k2].each do |k3, v3|
          cols[k1][k2][k3] = v3.upcase
        end
      end
    end
  end

  cols[:relation_type][:values][:table] = 'BASE TABLE'
  cols[:time][:data_type] = :'timestamp with time zone'
  cols
end

#tear_down_tracking(options = {}) ⇒ Object



137
138
139
140
141
142
# File 'lib/data_task/adapters/postgres.rb', line 137

def tear_down_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    drop_table TABLE_TRACKER_NAME
  end
end

#track_drop(table_name) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/data_task/adapters/postgres.rb', line 182

def track_drop table_name
  schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
  table_tracker_name = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

  if table_exists?(table_tracker_name)
    execute "      delete from \#{table_tracker_name}\n      where\n        relation_name = '\#{unqualified_table_name}' and \n        relation_type = '\#{relation_type_values[:table]}'\n    EOSQL\n  end\nend\n"

#tracking_tables?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/data_task/adapters/postgres.rb', line 119

def tracking_tables?
  data_exists?(TABLE_TRACKER_NAME)
end

#truncate_table(table_name) ⇒ Object Also known as: truncate_data



168
169
170
171
172
# File 'lib/data_task/adapters/postgres.rb', line 168

def truncate_table table_name
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  execute "truncate table #{table_name}"
  track_truncate table_name
end

#view_exists?(view_name, options = {}) ⇒ Boolean

Returns:

  • (Boolean)


204
205
206
# File 'lib/data_task/adapters/postgres.rb', line 204

def view_exists? view_name, options = {}
  relation_exists? view_name, :view, options
end

#with_role(role) ⇒ Object



249
250
251
252
253
254
255
# File 'lib/data_task/adapters/postgres.rb', line 249

def with_role role
  original_role = current_user
  execute "set role #{role}"
  r = yield
  execute "set role #{original_role}"
  r
end

#with_search_path(schemas) ⇒ Object



241
242
243
244
245
246
247
# File 'lib/data_task/adapters/postgres.rb', line 241

def with_search_path schemas
  original_search_path = search_path
  execute "set search_path to #{Array(schemas).join(',')}"
  r = yield
  execute "set search_path to #{original_search_path.join(',')}"
  r
end