Class: Flydata::SourcePostgresql::TableMeta
- Defined in:
- lib/flydata/source_postgresql/table_meta.rb
Overview
Fetch and keep table meta information
<table-name(Symbol)>:
table_name: <String> # Table name
table_schema: <String> or nil # Schema name
primary_keys: <Array of String> # Set primary key names. ex: ['group_id', 'category_id']
pk_positions: <Array of Integer> # Set the ordinal position of primary keys. ex: [1, 3]
max_row_size: <Integer> # byte, calculated based on column size
max_num_rows_per_query>: <Integer> # max number of rows per query
raw_columns: <Hash> # raw information schema data
columns: table_def.columns
table_def:
columns:
table:
column:
type:
not_null:
primary_key:
default:
column_size: (new) # Set in `PostgresqlTableDef.parse_one_column_def`
Constant Summary collapse
- GET_TABLE_META_SQL =
<<EOT SELECT c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.column_default, c.is_nullable, c.data_type, c.character_maximum_length, c.character_octet_length, CASE WHEN c.data_type='money' THEN 19 ELSE c.numeric_precision END AS numeric_precision, c.numeric_precision_radix, CASE WHEN c.data_type='money' THEN ( SELECT CASE WHEN scale IS NULL THEN 0 ELSE scale END FROM (SELECT length(substring('999999'::money::char varying, '\.([0-9]+)$')) AS scale) s) ELSE c.numeric_scale END AS numeric_scale, c.datetime_precision, i.indisprimary AS is_primary FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) RIGHT JOIN (SELECT (table_catalog ||'.'|| table_schema ||'.'|| table_name)::regclass AS regid, * FROM information_schema.columns ) c ON i.indrelid = c.regid AND a.attname = c.column_name WHERE c.table_catalog = '%{database}' AND c.table_schema IN (%{schema}) AND c.table_name IN (%{tables}) ORDER BY c.table_name, c.ordinal_position, i.indisprimary asc; EOT
- GET_CURRENT_SNAPSHOT_SQL =
"SELECT txid_current_snapshot();"
- DEFAULT_MAX_FETCH_RECORD_SIZE =
50000
Instance Attribute Summary collapse
-
#current_snapshot ⇒ Object
readonly
Returns the value of attribute current_snapshot.
Instance Method Summary collapse
- #build_table_meta(columns) ⇒ Object
- #calc_column_size(column) ⇒ Object
-
#initialize(options, tables, schema = nil) ⇒ TableMeta
constructor
DEFAULT_MAX_FETCH_RECORD_SIZE = 8.
- #reload(pg_client = nil) ⇒ Object
Methods inherited from TableMeta
Constructor Details
#initialize(options, tables, schema = nil) ⇒ TableMeta
DEFAULT_MAX_FETCH_RECORD_SIZE = 8
77 78 79 80 81 82 83 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 77 def initialize(, tables, schema = nil) @options = @database = [:dbname] || [:database] || ['database'] @tables = tables @schema = (schema || [:schema] || ['schema']).to_s.strip @schema = @schema.empty? ? nil : @schema end |
Instance Attribute Details
#current_snapshot ⇒ Object (readonly)
Returns the value of attribute current_snapshot.
85 86 87 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 85 def current_snapshot @current_snapshot end |
Instance Method Details
#build_table_meta(columns) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 114 def (columns) ret = Hash.new{|h,k| h[k]={} } # Put ret[<table-name-sym>][:raw_columns] columns.each do |col| column_name = col['column_name'].to_sym table_name = col['table_name'].to_sym = ret[table_name] [:raw_columns] ||= {} [:raw_columns][column_name] = col end ret.each do |table_name, | begin table_def = FlydataCore::TableDef::PostgresqlTableDef.create( [:raw_columns].values, @options) rescue FlydataCore::TableDefError => e .merge!( table_name: table_name, table_def_err: e, ) # Skip when getting an error when parsing the columns next end primary_keys = [] pk_positions = [] table_def.columns.each.with_index(1) do |col, index| col_name = col[:column] if col[:primary_key] primary_keys << col_name pk_positions << index.to_s end end .merge!( table_name: table_name.to_s, table_schema: @schema, primary_keys: primary_keys, pk_positions: pk_positions, #max_row_size: max_row_size, #TODO: calculation #max_num_rows_per_query: max_row_size + 128, #TODO: calculation max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE, columns: table_def.columns, table_def: table_def, ) end ret end |
#calc_column_size(column) ⇒ Object
165 166 167 168 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 165 def calc_column_size(column) #TODO: Implement the check logic based on column type 124 end |
#reload(pg_client = nil) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/flydata/source_postgresql/table_meta.rb', line 87 def reload(pg_client = nil) schema = @schema ? "'#{@schema}'" : "select current_schema" sql = GET_TABLE_META_SQL % { database: @database, schema: schema, tables: @tables.collect{|t| "'#{t}'"}.join(','), } conn = pg_client if conn.nil? local_conn = conn = FlydataCore::Postgresql::PGClient.new(@options) end # Set table_meta columns = conn.query(sql) @table_meta = (columns) # Set current snapshot current_snapshot_str = conn. query(GET_CURRENT_SNAPSHOT_SQL).first['txid_current_snapshot'] @current_snapshot = FlydataCore::Postgresql::Snapshot.new(current_snapshot_str) self ensure local_conn.close rescue nil if local_conn end |