Class: Tapsoob::DataStream::KeyedPartition

Inherits:
Base
  • Object
show all
Defined in:
lib/tapsoob/data_stream/keyed_partition.rb

Overview

DataStream variant for PK-based range partitioning

Constant Summary

Constants inherited from Base

Base::DEFAULT_CHUNKSIZE

Instance Attribute Summary

Attributes inherited from Base

#db, #options, #state

Instance Method Summary collapse

Methods inherited from Base

#encode_rows, #error, #error=, factory, #fetch, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #import_rows, #increment, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_stream

Constructor Details

#initialize(db, state, opts = {}) ⇒ KeyedPartition

Returns a new instance of KeyedPartition.



8
9
10
11
12
13
14
15
16
# File 'lib/tapsoob/data_stream/keyed_partition.rb', line 8

def initialize(db, state, opts = {})
  super(db, state, opts)
  # :partition_range = [min_pk, max_pk] for this partition
  # :last_pk = last primary key value fetched
  @state = {
    :partition_range => nil,
    :last_pk => nil
  }.merge(@state)
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


56
57
58
59
60
61
# File 'lib/tapsoob/data_stream/keyed_partition.rb', line 56

def complete?
  return true if state[:partition_range].nil?
  min_pk, max_pk = state[:partition_range]
  # Complete when we've fetched past the max PK
  state[:last_pk] && state[:last_pk] >= max_pk
end

#fetch_rowsObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/tapsoob/data_stream/keyed_partition.rb', line 22

def fetch_rows
  return {} if state[:partition_range].nil?

  # Only count once on first fetch
  state[:size] ||= table.count

  min_pk, max_pk = state[:partition_range]
  chunksize = state[:chunksize]

  # Build query with PK range filter
  key = primary_key
  last = state[:last_pk] || (min_pk - 1)

  ds = table.order(*order_by).filter do
    (Sequel.identifier(key) > last) & (Sequel.identifier(key) >= min_pk) & (Sequel.identifier(key) <= max_pk)
  end.limit(chunksize)

  data = ds.all

  # Update last_pk for next fetch
  if data.any?
    state[:last_pk] = data.last[primary_key]
  else
    # No data found in this range - mark partition as complete
    state[:last_pk] = max_pk
  end

  Tapsoob::Utils.format_data(db, data,
    :string_columns => string_columns,
    :schema => db.schema(table_name),
    :table => table_name
  )
end

#primary_keyObject



18
19
20
# File 'lib/tapsoob/data_stream/keyed_partition.rb', line 18

def primary_key
  @primary_key ||= Tapsoob::Utils.order_by(db, table_name).first
end