Class: Flydata::SourceOracle::TableMeta

Inherits:
TableMeta
  • Object
show all
Defined in:
lib/flydata/source_oracle/table_meta.rb

Overview

Fetch and keep table meta information

<table-name(Symbol)>:
  table_name: <String>               # Table name
  table_schema: <String> or nil      # Schema name
  primary_keys: <Array of String>    # Set primary key names. ex: ['group_id', 'category_id']
  pk_positions: <Array of Integer>   # Set the ordinal position of primary keys. ex: [1, 3]
  max_row_size: <Integer>            # byte, calculated based on column size
  max_num_rows_per_query>: <Integer> # max number of rows per query
  raw_columns: <Hash>                # raw information schema data
  columns: table_def.columns
  table_def:
    columns:
      table:
      column:
      type:
      not_null:
      primary_key:
      default:
      column_size: (new)  # Set in `OracleTableDef.parse_one_column_def`

Constant Summary collapse

GET_TABLE_META_SQL =
<<EOS
SELECT
   tbls.OWNER                        AS schema
  ,cols.TABLE_NAME                   AS table_name
  ,cols.COLUMN_NAME                  AS column_name
  ,cols.COLUMN_ID                    AS ordinal_position
  ,(CASE
    WHEN pk_cols.CONSTRAINT_NAME IS NULL THEN
      'f'
    ELSE
      't'
    END)                             AS is_primary
  ,pk_cols.CONSTRAINT_NAME           AS pk_constraint_name
  ,pk_cols.KEY_ORDINAL               AS pk_key_ordinal
  ,(CASE
    WHEN uq_cols.CONSTRAINT_NAME IS NULL THEN
      'f'
    ELSE
      't'
   END)                              AS is_unique
  ,uq_cols.CONSTRAINT_NAME           AS uq_constraint_name
  ,uq_cols.KEY_ORDINAL               AS uq_key_ordinal
  ,cols.DATA_TYPE                    AS column_data_type
  ,cols.DATA_DEFAULT                 AS column_default
  ,cols.NULLABLE                     AS is_nullable
  ,cols.DATA_LENGTH                  AS max_length
  ,cols.DATA_PRECISION               AS precision
  ,cols.DATA_SCALE                   AS scale
FROM
  (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls
  INNER JOIN ALL_TAB_COLUMNS cols
    ON  cols.OWNER      = tbls.OWNER
    AND cols.TABLE_NAME = tbls.TABLE_NAME
  LEFT OUTER JOIN (
    SELECT
       cons_inner.CONSTRAINT_NAME  AS constraint_name
      ,cons_columns_inner.POSITION AS key_ordinal
      ,tbls_inner.TABLE_NAME       AS table_name
      ,cols_inner.COLUMN_NAME      AS col_name
      ,cols_inner.COLUMN_ID        AS col_id
    FROM
      (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls_inner
      INNER JOIN ALL_TAB_COLUMNS cols_inner
        ON  cols_inner.OWNER      = tbls_inner.OWNER
        AND cols_inner.TABLE_NAME = tbls_inner.TABLE_NAME
      INNER JOIN ALL_CONSTRAINTS cons_inner
        ON  cons_inner.OWNER           = cols_inner.OWNER
        AND cons_inner.CONSTRAINT_TYPE = 'P'
        AND cons_inner.TABLE_NAME      = tbls_inner.TABLE_NAME
      INNER JOIN ALL_CONS_COLUMNS cons_columns_inner
        ON  cons_columns_inner.OWNER           = cons_inner.OWNER
        AND cons_columns_inner.CONSTRAINT_NAME = cons_inner.CONSTRAINT_NAME
        AND cons_columns_inner.TABLE_NAME      = cons_inner.TABLE_NAME
        AND cons_columns_inner.COLUMN_NAME     = cols_inner.COLUMN_NAME
  ) pk_cols
    ON  pk_cols.TABLE_NAME = cols.TABLE_NAME
    AND pk_cols.COL_NAME   = cols.COLUMN_NAME
    AND pk_cols.COL_ID     = cols.COLUMN_ID
  LEFT OUTER JOIN (
    SELECT
       cons_inner.CONSTRAINT_NAME  AS constraint_name
      ,cons_columns_inner.POSITION AS key_ordinal
      ,tbls_inner.TABLE_NAME       AS table_name
      ,cols_inner.COLUMN_NAME      AS col_name
      ,cols_inner.COLUMN_ID        AS col_id
    FROM
      (SELECT * FROM ALL_TABLES WHERE OWNER = %{schema}) tbls_inner
      INNER JOIN ALL_TAB_COLUMNS cols_inner
        ON  cols_inner.OWNER      = tbls_inner.OWNER
        AND cols_inner.TABLE_NAME = tbls_inner.TABLE_NAME
      INNER JOIN ALL_CONSTRAINTS cons_inner
        ON  cons_inner.OWNER           = cols_inner.OWNER
        AND cons_inner.CONSTRAINT_TYPE = 'U'
        AND cons_inner.TABLE_NAME      = tbls_inner.TABLE_NAME
      INNER JOIN ALL_CONS_COLUMNS cons_columns_inner
        ON  cons_columns_inner.OWNER           = cons_inner.OWNER
        AND cons_columns_inner.CONSTRAINT_NAME = cons_inner.CONSTRAINT_NAME
        AND cons_columns_inner.TABLE_NAME      = cons_inner.TABLE_NAME
        AND cons_columns_inner.COLUMN_NAME     = cols_inner.COLUMN_NAME
   ) uq_cols
     ON  uq_cols.TABLE_NAME = cols.TABLE_NAME
     AND uq_cols.COL_NAME   = cols.COLUMN_NAME
     AND uq_cols.COL_ID     = cols.COLUMN_ID
ORDER BY
  table_name, ordinal_position
EOS
GET_CURRENT_SCN_SQL =
"SELECT current_scn from v$database"
DEFAULT_MAX_FETCH_RECORD_SIZE =
50000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from TableMeta

#[]

Constructor Details

#initialize(options, tables, schema = nil) ⇒ TableMeta

DEFAULT_MAX_FETCH_RECORD_SIZE = 8



125
126
127
128
129
130
131
# File 'lib/flydata/source_oracle/table_meta.rb', line 125

def initialize(options, tables, schema = nil)
  @options = options
  @database = options[:dbname] || options[:database] || options['database']
  @tables = tables
  username = options[:username] || options['username']
  @schema = (schema || options[:schema] || options['schema'] || username).to_s.strip.upcase
end

Instance Attribute Details

#current_scnObject (readonly)

Returns the value of attribute current_scn.



133
134
135
# File 'lib/flydata/source_oracle/table_meta.rb', line 133

def current_scn
  @current_scn
end

Instance Method Details

#build_table_meta(columns) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/flydata/source_oracle/table_meta.rb', line 164

def build_table_meta(columns)
  ret = Hash.new{|h,k| h[k]={} }

  # Put ret[<table-name-sym>][:raw_columns]
  while (col = columns.fetch_hash)
    column_name = col['COLUMN_NAME'].to_sym
    table_name = col['TABLE_NAME'].to_sym
    t_meta = ret[table_name]
    t_meta[:raw_columns] = Hash.new {|h,k| h[k] = []} unless t_meta[:raw_columns]
    t_meta[:raw_columns][column_name] << col
  end

  ret.each do |table_name, t_meta|
    begin
      table_def = FlydataCore::TableDef::OracleTableDef.create(
                  t_meta[:raw_columns].values, @options)
    rescue FlydataCore::TableDefError => e
      t_meta.merge!(
        table_name: table_name,
        table_def_err: e,
      )
      # Skip when getting an error when parsing the columns
      next
    end

    primary_keys = []
    pk_positions = []
    table_def.columns.each.with_index(1) do |col, index|
      col_name = col[:column]
      if col["CONSTRAINT_TYPE"] == 'P'
        primary_keys << col_name
        pk_positions << index.to_s
      end
    end

    t_meta.merge!(
      table_name: table_name.to_s,
      table_schema: @schema,
      primary_keys: primary_keys,
      pk_positions: pk_positions,
      max_num_rows_per_query: DEFAULT_MAX_FETCH_RECORD_SIZE,
      columns: table_def.columns,
      table_def: table_def,
    )
  end

  ret
end

#calc_column_size(column) ⇒ Object



213
214
215
216
# File 'lib/flydata/source_oracle/table_meta.rb', line 213

def calc_column_size(column)
  #TODO: Implement the check logic based on column type
  124
end

#reload(oracle_client = nil) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/flydata/source_oracle/table_meta.rb', line 135

def reload(oracle_client = nil)
  schema = {
    database: "'#{@database}'",
    schema: "'#{@schema}'",
    tables: @tables.collect{|t| "'#{t}'"}.join(','),
  }

  conn = oracle_client
  if conn.nil?
    local_conn = conn = FlydataCore::Oracle::OracleClient.new(@options)
  end

  sql = GET_TABLE_META_SQL % schema

  # Set table_meta
  columns = conn.query(sql)
  @table_meta = build_table_meta(columns)

  # Set current scn
  current_scn_str = conn.query(GET_CURRENT_SCN_SQL).fetch.first

  raise "Invalid scn is retrieved" if current_scn_str.nil?
  @current_scn = current_scn_str.to_i

  self
ensure
  local_conn.close rescue nil if local_conn
end