Class: Flydata::SourcePostgresql::TableMeta
- Defined in:
- lib/flydata/source_postgresql/table_meta.rb
Overview
Fetch and keep table meta information
<table-name(Symbol)>:
table_name: <String> # Table name
table_schema: <String> or nil # Schema name
primary_keys: <Array of String> # Set primary key names. ex: ['group_id', 'category_id']
pk_positions: <Array of Integer> # Set the ordinal position of primary keys. ex: [1, 3]
max_row_size: <Integer> # byte, calculated based on column size
max_num_rows_per_query>: <Integer> # max number of rows per query
raw_columns: <Hash> # raw information schema data
columns: table_def.columns
table_def:
columns:
table:
column:
type:
not_null:
primary_key:
default:
column_size: (new) # Set in `PostgresqlTableDef.parse_one_column_def`
Constant Summary collapse
- GET_TABLE_META_SQL =
<<EOT SELECT c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.column_default, c.is_nullable, c.data_type, c.character_maximum_length, c.character_octet_length, CASE WHEN c.data_type='money' THEN 19 ELSE c.numeric_precision END AS numeric_precision, c.numeric_precision_radix, CASE WHEN c.data_type='money' THEN ( SELECT CASE WHEN scale IS NULL THEN 0 ELSE scale END FROM (SELECT length(substring('999999'::money::char varying, '\.([0-9]+)$')) AS scale) s) ELSE c.numeric_scale END AS numeric_scale, c.datetime_precision, i.indexrelid::regclass as index_name, i.indisprimary as is_primary, i.indisunique as is_unique, i.indisvalid as is_valid, CASE WHEN i.indpred!='NULL' THEN true ELSE false END AS is_partial FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) RIGHT JOIN (SELECT (table_schema ||'.'|| table_name)::regclass AS regid, * FROM information_schema.columns WHERE table_catalog = '%{database}' AND table_schema IN (%{schema}) AND table_name IN (%{tables}) ) c ON i.indrelid = c.regid AND a.attname = c.column_name WHERE c.table_catalog = '%{database}' AND c.table_schema IN (%{schema}) AND c.table_name IN (%{tables}) ORDER BY c.table_name, c.ordinal_position, i.indisprimary desc, i.indisunique desc, i.indisvalid asc; EOT
- GET_CURRENT_SNAPSHOT_SQL =
"SELECT txid_current_snapshot();"
- DEFAULT_MAX_FETCH_RECORD_SIZE =
50000
Instance Attribute Summary collapse
-
#current_snapshot ⇒ Object
readonly
Returns the value of attribute current_snapshot.
Instance Method Summary collapse
- #build_table_meta(columns) ⇒ Object
- #calc_column_size(column) ⇒ Object
-
#initialize(options, tables, schema = nil) ⇒ TableMeta
constructor
DEFAULT_MAX_FETCH_RECORD_SIZE = 8.
- #reload(pg_client = nil) ⇒ Object
Methods inherited from TableMeta
Constructor Details
#initialize(options, tables, schema = nil) ⇒ TableMeta
DEFAULT_MAX_FETCH_RECORD_SIZE = 8
85 86 87 88 89 90 91 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 85 def initialize(, tables, schema = nil) @options = @database = [:dbname] || [:database] || ['database'] @tables = tables @schema = (schema || [:schema] || ['schema']).to_s.strip @schema = @schema.empty? ? nil : @schema end |
Instance Attribute Details
#current_snapshot ⇒ Object (readonly)
Returns the value of attribute current_snapshot
93 94 95 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 93 def current_snapshot @current_snapshot end |
Instance Method Details
#build_table_meta(columns) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 122 def (columns) ret = Hash.new{|h,k| h[k]={} } # Put ret[<table-name-sym>][:raw_columns] columns.each do |col| column_name = col['column_name'].to_sym table_name = col['table_name'].to_sym = ret[table_name] [:raw_columns] = Hash.new {|h,k| h[k] = []} unless [:raw_columns] [:raw_columns][column_name] << col end ret.each do |table_name, | begin table_def = FlydataCore::TableDef::PostgresqlTableDef.create( [:raw_columns].values, @options) rescue FlydataCore::TableDefError => e .merge!( table_name: table_name, table_def_err: e, ) # Skip when getting an error when parsing the columns next end primary_keys = [] pk_positions = [] table_def.columns.each.with_index(1) do |col, index| col_name = col[:column] if col[:primary_key] primary_keys << col_name pk_positions << index.to_s end end .merge!( table_name: table_name.to_s, table_schema: @schema, primary_keys: primary_keys, pk_positions: pk_positions, #max_row_size: max_row_size, #TODO: calculation #max_num_rows_per_query: max_row_size + 128, #TODO: calculation max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE, columns: table_def.columns, table_def: table_def, ) end ret end |
#calc_column_size(column) ⇒ Object
173 174 175 176 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 173 def calc_column_size(column) #TODO: Implement the check logic based on column type 124 end |
#reload(pg_client = nil) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 95 def reload(pg_client = nil) schema = @schema ? "'#{@schema}'" : "select current_schema" sql = GET_TABLE_META_SQL % { database: @database, schema: schema, tables: @tables.collect{|t| "'#{t}'"}.join(','), } conn = pg_client if conn.nil? local_conn = conn = FlydataCore::Postgresql::PGClient.new(@options) end # Set table_meta columns = conn.query(sql) @table_meta = (columns) # Set current snapshot current_snapshot_str = conn. query(GET_CURRENT_SNAPSHOT_SQL).first['txid_current_snapshot'] @current_snapshot = FlydataCore::Postgresql::Snapshot.new(current_snapshot_str) self ensure local_conn.close rescue nil if local_conn end |