Class: Flydata::SourcePostgresql::TableMeta

Inherits:
TableMeta
  • Object
show all
Defined in:
lib/flydata/source_postgresql/table_meta.rb

Overview

Fetch and keep table meta information

<table-name(Symbol)>:
  table_name: <String>               # Table name
  table_schema: <String> or nil      # Schema name
  primary_keys: <Array of String>    # Set primary key names. ex: ['group_id', 'category_id']
  pk_positions: <Array of Integer>   # Set the ordinal position of primary keys. ex: [1, 3]
  max_row_size: <Integer>            # byte, calculated based on column size
  max_num_rows_per_query>: <Integer> # max number of rows per query
  raw_columns: <Hash>                # raw information schema data
  columns: table_def.columns
  table_def:
    columns:
      table:
      column:
      type:
      not_null:
      primary_key:
      default:
      column_size: (new)  # Set in `PostgresqlTableDef.parse_one_column_def`

Constant Summary collapse

GET_TABLE_META_SQL =
<<EOT
SELECT
  c.table_catalog,
  c.table_schema,
  c.table_name,
  c.column_name,
  c.ordinal_position,
  c.column_default,
  c.is_nullable,
  c.data_type,
  c.character_maximum_length,
  c.character_octet_length,
  CASE WHEN c.data_type='money' THEN 19
   ELSE c.numeric_precision END AS numeric_precision,
  c.numeric_precision_radix,
  CASE WHEN c.data_type='money' THEN (
    SELECT CASE WHEN scale IS NULL THEN 0 ELSE scale END
      FROM (SELECT length(substring('999999'::money::char varying, '\.([0-9]+)$')) AS scale) s)
   ELSE c.numeric_scale END AS numeric_scale,
  c.datetime_precision,
  i.indexrelid::regclass as index_name,
  i.indisprimary as is_primary,
  i.indisunique as is_unique,
  i.indisvalid as is_valid,
  CASE WHEN i.indpred!='NULL' THEN true ELSE false END AS is_partial
FROM
  pg_index i
  JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
  RIGHT JOIN
    (SELECT
      (table_schema ||'.'|| table_name)::regclass AS regid,
      *
     FROM information_schema.columns
     WHERE
       table_catalog = '%{database}'
       AND table_schema IN (%{schema})
       AND table_name IN (%{tables})
    ) c
  ON i.indrelid = c.regid AND a.attname = c.column_name
WHERE
  c.table_catalog = '%{database}'
  AND c.table_schema IN (%{schema})
  AND c.table_name IN (%{tables})
ORDER BY
  c.table_name, c.ordinal_position, i.indisprimary desc, i.indisunique desc, i.indisvalid asc;
EOT
GET_CURRENT_SNAPSHOT_SQL =
"SELECT txid_current_snapshot();"
DEFAULT_MAX_FETCH_RECORD_SIZE =
50000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from TableMeta

#[]

Constructor Details

#initialize(options, tables, schema = nil) ⇒ TableMeta

DEFAULT_MAX_FETCH_RECORD_SIZE = 8


85
86
87
88
89
90
91
# File 'lib/flydata/source_postgresql/table_meta.rb', line 85

def initialize(options, tables, schema = nil)
  @options = options
  @database = options[:dbname] || options[:database] || options['database']
  @tables = tables
  @schema = (schema || options[:schema] || options['schema']).to_s.strip
  @schema = @schema.empty? ? nil : @schema
end

Instance Attribute Details

#current_snapshotObject (readonly)

Returns the value of attribute current_snapshot.


93
94
95
# File 'lib/flydata/source_postgresql/table_meta.rb', line 93

def current_snapshot
  @current_snapshot
end

Instance Method Details

#build_table_meta(columns) ⇒ Object


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/flydata/source_postgresql/table_meta.rb', line 122

def build_table_meta(columns)
  ret = Hash.new{|h,k| h[k]={} }

  # Put ret[<table-name-sym>][:raw_columns]
  columns.each do |col|
    column_name = col['column_name'].to_sym
    table_name = col['table_name'].to_sym
    t_meta = ret[table_name]
    t_meta[:raw_columns] = Hash.new {|h,k| h[k] = []} unless t_meta[:raw_columns]
    t_meta[:raw_columns][column_name] << col
  end

  ret.each do |table_name, t_meta|
    begin
      table_def = FlydataCore::TableDef::PostgresqlTableDef.create(
                  t_meta[:raw_columns].values, @options)
    rescue FlydataCore::TableDefError => e
      t_meta.merge!(
        table_name: table_name,
        table_def_err: e,
      )
      # Skip when getting an error when parsing the columns
      next
    end

    primary_keys = []
    pk_positions = []
    table_def.columns.each.with_index(1) do |col, index|
      col_name = col[:column]
      if col[:primary_key]
        primary_keys << col_name
        pk_positions << index.to_s
      end
    end

    t_meta.merge!(
      table_name: table_name.to_s,
      table_schema: @schema,
      primary_keys: primary_keys,
      pk_positions: pk_positions,
      #max_row_size: max_row_size,                  #TODO: calculation
      #max_num_rows_per_query: max_row_size + 128,  #TODO: calculation
      max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE,
      columns: table_def.columns,
      table_def: table_def,
    )
  end

  ret
end

#calc_column_size(column) ⇒ Object


173
174
175
176
# File 'lib/flydata/source_postgresql/table_meta.rb', line 173

def calc_column_size(column)
  #TODO: Implement the check logic based on column type
  124
end

#reload(pg_client = nil) ⇒ Object


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/flydata/source_postgresql/table_meta.rb', line 95

def reload(pg_client = nil)
  schema = @schema ? "'#{@schema}'" : "select current_schema"
  sql = GET_TABLE_META_SQL % {
    database: @database,
    schema: schema,
    tables: @tables.collect{|t| "'#{t}'"}.join(','),
  }

  conn = pg_client
  if conn.nil?
    local_conn = conn = FlydataCore::Postgresql::PGClient.new(@options)
  end

  # Set table_meta
  columns = conn.query(sql)
  @table_meta = build_table_meta(columns)

  # Set current snapshot
  current_snapshot_str = conn.
    query(GET_CURRENT_SNAPSHOT_SQL).first['txid_current_snapshot']
  @current_snapshot = FlydataCore::Postgresql::Snapshot.new(current_snapshot_str)

  self
ensure
  local_conn.close rescue nil if local_conn
end