Class: Flydata::SourceOracle::TableMeta

Inherits:
TableMeta
  • Object
show all
Defined in:
lib/flydata/source_oracle/table_meta.rb

Overview

Fetch and keep table meta information

<table-name(Symbol)>:
  table_name: <String>               # Table name
  table_schema: <String> or nil      # Schema name
  primary_keys: <Array of String>    # Set primary key names. ex: ['group_id', 'category_id']
  pk_positions: <Array of Integer>   # Set the ordinal position of primary keys. ex: [1, 3]
  max_row_size: <Integer>            # byte, calculated based on column size
  max_num_rows_per_query>: <Integer> # max number of rows per query
  raw_columns: <Hash>                # raw information schema data
  columns: table_def.columns
  table_def:
    columns:
      table:
      column:
      type:
      not_null:
      primary_key:
      default:
      column_size: (new)  # Set in `OracleTableDef.parse_one_column_def`

Constant Summary collapse

GET_TABLE_META_SQL =
"SELECT\n   tbls.OWNER                        AS schema\n  ,cols.TABLE_NAME                   AS table_name\n  ,cols.COLUMN_NAME                  AS column_name\n  ,cols.COLUMN_ID                    AS ordinal_position\n  ,(CASE\n    WHEN pk_cols.CONSTRAINT_NAME IS NULL THEN\n      'f'\n    ELSE\n      't'\n    END)                             AS is_primary\n  ,pk_cols.CONSTRAINT_NAME           AS pk_constraint_name\n  ,pk_cols.KEY_ORDINAL               AS pk_key_ordinal\n  ,(CASE\n    WHEN uq_cols.CONSTRAINT_NAME IS NULL THEN\n      'f'\n    ELSE\n      't'\n   END)                              AS is_unique\n  ,uq_cols.CONSTRAINT_NAME           AS uq_constraint_name\n  ,uq_cols.KEY_ORDINAL               AS uq_key_ordinal\n  ,cols.DATA_TYPE                    AS column_data_type\n  ,cols.DATA_DEFAULT                 AS column_default\n  ,cols.NULLABLE                     AS is_nullable\n  ,cols.DATA_LENGTH                  AS max_length\n  ,cols.DATA_PRECISION               AS precision\n  ,cols.DATA_SCALE                   AS scale\nFROM\n  (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls\n  INNER JOIN ALL_TAB_COLUMNS cols\n    ON  cols.OWNER      = tbls.OWNER\n    AND cols.TABLE_NAME = tbls.TABLE_NAME\n  LEFT OUTER JOIN (\n    SELECT\n       cons_inner.CONSTRAINT_NAME  AS constraint_name\n      ,cons_columns_inner.POSITION AS key_ordinal\n      ,tbls_inner.TABLE_NAME       AS table_name\n      ,cols_inner.COLUMN_NAME      AS col_name\n      ,cols_inner.COLUMN_ID        AS col_id\n    FROM\n      (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls_inner\n      INNER JOIN ALL_TAB_COLUMNS cols_inner\n        ON  cols_inner.OWNER      = tbls_inner.OWNER\n        AND cols_inner.TABLE_NAME = tbls_inner.TABLE_NAME\n      INNER JOIN ALL_CONSTRAINTS cons_inner\n        ON  cons_inner.OWNER           = cols_inner.OWNER\n        AND cons_inner.CONSTRAINT_TYPE = 'P'\n        AND cons_inner.TABLE_NAME      = tbls_inner.TABLE_NAME\n      INNER JOIN ALL_CONS_COLUMNS cons_columns_inner\n        ON  cons_columns_inner.OWNER           = cons_inner.OWNER\n        AND cons_columns_inner.CONSTRAINT_NAME = cons_inner.CONSTRAINT_NAME\n        AND cons_columns_inner.TABLE_NAME      = cons_inner.TABLE_NAME\n        AND cons_columns_inner.COLUMN_NAME     = cols_inner.COLUMN_NAME\n  ) pk_cols\n    ON  pk_cols.TABLE_NAME = cols.TABLE_NAME\n    AND pk_cols.COL_NAME   = cols.COLUMN_NAME\n    AND pk_cols.COL_ID     = cols.COLUMN_ID\n  LEFT OUTER JOIN (\n    SELECT\n       cons_inner.CONSTRAINT_NAME  AS constraint_name\n      ,cons_columns_inner.POSITION AS key_ordinal\n      ,tbls_inner.TABLE_NAME       AS table_name\n      ,cols_inner.COLUMN_NAME      AS col_name\n      ,cols_inner.COLUMN_ID        AS col_id\n    FROM\n      (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls_inner\n      INNER JOIN ALL_TAB_COLUMNS cols_inner\n        ON  cols_inner.OWNER      = tbls_inner.OWNER\n        AND cols_inner.TABLE_NAME = tbls_inner.TABLE_NAME\n      INNER JOIN ALL_CONSTRAINTS cons_inner\n        ON  cons_inner.OWNER           = cols_inner.OWNER\n        AND cons_inner.CONSTRAINT_TYPE = 'U'\n        AND cons_inner.TABLE_NAME      = tbls_inner.TABLE_NAME\n      INNER JOIN ALL_CONS_COLUMNS cons_columns_inner\n        ON  cons_columns_inner.OWNER           = cons_inner.OWNER\n        AND cons_columns_inner.CONSTRAINT_NAME = cons_inner.CONSTRAINT_NAME\n        AND cons_columns_inner.TABLE_NAME      = cons_inner.TABLE_NAME\n        AND cons_columns_inner.COLUMN_NAME     = cols_inner.COLUMN_NAME\n   ) uq_cols\n     ON  uq_cols.TABLE_NAME = cols.TABLE_NAME\n     AND uq_cols.COL_NAME   = cols.COLUMN_NAME\n     AND uq_cols.COL_ID     = cols.COLUMN_ID\nORDER BY\n  table_name, ordinal_position\n"
GET_CURRENT_SCN_SQL =
"SELECT current_scn from v$database"
DEFAULT_MAX_FETCH_RECORD_SIZE =
50000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from TableMeta

#[]

Constructor Details

#initialize(options, tables, schema = nil) ⇒ TableMeta

DEFAULT_MAX_FETCH_RECORD_SIZE = 8



125
126
127
128
129
130
131
# File 'lib/flydata/source_oracle/table_meta.rb', line 125

def initialize(options, tables, schema = nil)
  @options = options
  @database = options[:dbname] || options[:database] || options['database']
  @tables = tables
  username = options[:username] || options['username']
  @schema = (schema || options[:schema] || options['schema'] || username).to_s.strip.upcase
end

Instance Attribute Details

#current_scnObject (readonly)

Returns the value of attribute current_scn.



133
134
135
# File 'lib/flydata/source_oracle/table_meta.rb', line 133

def current_scn
  @current_scn
end

Instance Method Details

#build_table_meta(columns) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/flydata/source_oracle/table_meta.rb', line 164

def build_table_meta(columns)
  ret = Hash.new{|h,k| h[k]={} }

  # Put ret[<table-name-sym>][:raw_columns]
  while (col = columns.fetch_hash)
    column_name = col['COLUMN_NAME'].to_sym
    table_name = col['TABLE_NAME'].to_sym
    t_meta = ret[table_name]
    t_meta[:raw_columns] = Hash.new {|h,k| h[k] = []} unless t_meta[:raw_columns]
    t_meta[:raw_columns][column_name] << col
  end

  ret.each do |table_name, t_meta|
    begin
      table_def = FlydataCore::TableDef::OracleTableDef.create(
                  t_meta[:raw_columns].values, @options)
    rescue FlydataCore::TableDefError => e
      t_meta.merge!(
        table_name: table_name,
        table_def_err: e,
      )
      # Skip when getting an error when parsing the columns
      next
    end

    primary_keys = []
    pk_positions = []
    table_def.columns.each.with_index(1) do |col, index|
      col_name = col[:column]
      if col["CONSTRAINT_TYPE"] == 'P'
        primary_keys << col_name
        pk_positions << index.to_s
      end
    end

    t_meta.merge!(
      table_name: table_name.to_s,
      table_schema: @schema,
      primary_keys: primary_keys,
      pk_positions: pk_positions,
      max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE,
      columns: table_def.columns,
      table_def: table_def,
    )
  end

  ret
end

#calc_column_size(column) ⇒ Object



213
214
215
216
# File 'lib/flydata/source_oracle/table_meta.rb', line 213

def calc_column_size(column)
  #TODO: Implement the check logic based on column type
  124
end

#reload(oracle_client = nil) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/flydata/source_oracle/table_meta.rb', line 135

def reload(oracle_client = nil)
  schema = {
    database: "'#{@database}'",
    schema: "'#{@schema}'",
    tables: @tables.collect{|t| "'#{t}'"}.join(','),
  }

  conn = oracle_client
  if conn.nil?
    local_conn = conn = FlydataCore::Oracle::OracleClient.new(@options)
  end

  sql = GET_TABLE_META_SQL % schema

  # Set table_meta
  columns = conn.query(sql)
  @table_meta = build_table_meta(columns)

  # Set current scn
  current_scn_str = conn.query(GET_CURRENT_SCN_SQL).fetch.first

  raise "Invalid scn is retrieved" if current_scn_str.nil?
  @current_scn = current_scn_str.to_i

  self
ensure
  local_conn.close rescue nil if local_conn
end