Class: Flydata::SourcePostgresql::TableMeta

Inherits:
TableMeta
  • Object
show all
Defined in:
lib/flydata/source_postgresql/table_meta.rb

Overview

Fetch and keep table meta information

<table-name(Symbol)>:
  table_name: <String>               # Table name
  table_schema: <String> or nil      # Schema name
  primary_keys: <Array of String>    # Set primary key names. ex: ['group_id', 'category_id']
  pk_positions: <Array of Integer>   # Set the ordinal position of primary keys. ex: [1, 3]
  max_row_size: <Integer>            # byte, calculated based on column size
  max_num_rows_per_query>: <Integer> # max number of rows per query
  raw_columns: <Hash>                # raw information schema data
  columns: table_def.columns
  table_def:
    columns:
      table:
      column:
      type:
      not_null:
      primary_key:
      default:
      column_size: (new)  # Set in `PostgresqlTableDef.parse_one_column_def`

Constant Summary collapse

GET_TABLE_META_SQL =
<<EOT
SELECT
  c.table_catalog,
  c.table_schema,
  c.table_name,
  c.column_name,
  c.ordinal_position,
  c.column_default,
  c.is_nullable,
  c.data_type,
  c.character_maximum_length,
  c.character_octet_length,
  CASE WHEN c.data_type='money' THEN 19
   ELSE c.numeric_precision END AS numeric_precision,
  c.numeric_precision_radix,
  CASE WHEN c.data_type='money' THEN (
    SELECT CASE WHEN scale IS NULL THEN 0 ELSE scale END
      FROM (SELECT length(substring('999999'::money::char varying, '\.([0-9]+)$')) AS scale) s)
   ELSE c.numeric_scale END AS numeric_scale,
  c.datetime_precision,
  i.indisprimary AS is_primary
FROM
  pg_index i
  JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
  RIGHT JOIN
    (SELECT
      (table_catalog ||'.'|| table_schema ||'.'|| table_name)::regclass AS regid,
      *
     FROM information_schema.columns
    ) c
  ON i.indrelid = c.regid AND a.attname = c.column_name
WHERE
  c.table_catalog = '%{database}'
  AND c.table_schema IN (%{schema})
  AND c.table_name IN (%{tables})
ORDER BY
  c.table_name, c.ordinal_position, i.indisprimary asc;
EOT
GET_CURRENT_SNAPSHOT_SQL =
"SELECT txid_current_snapshot();"
DEFAULT_MAX_FETCH_RECORD_SIZE =
50000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from TableMeta

#[]

Constructor Details

#initialize(options, tables, schema = nil) ⇒ TableMeta

DEFAULT_MAX_FETCH_RECORD_SIZE = 8



77
78
79
80
81
82
83
# File 'lib/flydata/source_postgresql/table_meta.rb', line 77

def initialize(options, tables, schema = nil)
  @options = options
  @database = options[:dbname] || options[:database] || options['database']
  @tables = tables
  @schema = (schema || options[:schema] || options['schema']).to_s.strip
  @schema = @schema.empty? ? nil : @schema
end

Instance Attribute Details

#current_snapshotObject (readonly)

Returns the value of attribute current_snapshot.



85
86
87
# File 'lib/flydata/source_postgresql/table_meta.rb', line 85

def current_snapshot
  @current_snapshot
end

Instance Method Details

#build_table_meta(columns) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/flydata/source_postgresql/table_meta.rb', line 114

def build_table_meta(columns)
  ret = Hash.new{|h,k| h[k]={} }

  # Put ret[<table-name-sym>][:raw_columns]
  columns.each do |col|
    column_name = col['column_name'].to_sym
    table_name = col['table_name'].to_sym
    t_meta = ret[table_name]
    t_meta[:raw_columns] ||= {}
    t_meta[:raw_columns][column_name] = col
  end

  ret.each do |table_name, t_meta|
    begin
      table_def = FlydataCore::TableDef::PostgresqlTableDef.create(
                  t_meta[:raw_columns].values, @options)
    rescue FlydataCore::TableDefError => e
      t_meta.merge!(
        table_name: table_name,
        table_def_err: e,
      )
      # Skip when getting an error when parsing the columns
      next
    end

    primary_keys = []
    pk_positions = []
    table_def.columns.each.with_index(1) do |col, index|
      col_name = col[:column]
      if col[:primary_key]
        primary_keys << col_name
        pk_positions << index.to_s
      end
    end

    t_meta.merge!(
      table_name: table_name.to_s,
      table_schema: @schema,
      primary_keys: primary_keys,
      pk_positions: pk_positions,
      #max_row_size: max_row_size,                  #TODO: calculation
      #max_num_rows_per_query: max_row_size + 128,  #TODO: calculation
      max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE,
      columns: table_def.columns,
      table_def: table_def,
    )
  end

  ret
end

#calc_column_size(column) ⇒ Object



165
166
167
168
# File 'lib/flydata/source_postgresql/table_meta.rb', line 165

def calc_column_size(column)
  #TODO: Implement the check logic based on column type
  124
end

#reload(pg_client = nil) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/flydata/source_postgresql/table_meta.rb', line 87

def reload(pg_client = nil)
  schema = @schema ? "'#{@schema}'" : "select current_schema"
  sql = GET_TABLE_META_SQL % {
    database: @database,
    schema: schema,
    tables: @tables.collect{|t| "'#{t}'"}.join(','),
  }

  conn = pg_client
  if conn.nil?
    local_conn = conn = FlydataCore::Postgresql::PGClient.new(@options)
  end

  # Set table_meta
  columns = conn.query(sql)
  @table_meta = build_table_meta(columns)

  # Set current snapshot
  current_snapshot_str = conn.
    query(GET_CURRENT_SNAPSHOT_SQL).first['txid_current_snapshot']
  @current_snapshot = FlydataCore::Postgresql::Snapshot.new(current_snapshot_str)

  self
ensure
  local_conn.close rescue nil if local_conn
end