Class: Tapsoob::DataStream::Keyed

Inherits:
Base
  • Object
show all
Defined in:
lib/tapsoob/data_stream/keyed.rb

Constant Summary

Constants inherited from Base

Base::DEFAULT_CHUNKSIZE

Instance Attribute Summary collapse

Attributes inherited from Base

#db, #options, #state

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #fetch_rows, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats

Constructor Details

#initialize(db, state, opts = {}) ⇒ Keyed

Returns a new instance of Keyed.



9
10
11
12
13
14
# File 'lib/tapsoob/data_stream/keyed.rb', line 9

def initialize(db, state, opts = {})
  super(db, state, opts)
  @state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @buffer = []
end

Instance Attribute Details

#bufferObject

Returns the value of attribute buffer.



7
8
9
# File 'lib/tapsoob/data_stream/keyed.rb', line 7

def buffer
  @buffer
end

Class Method Details

.calculate_pk_ranges(db, table_name, num_partitions) ⇒ Object

Calculate PK range for partitioning



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/tapsoob/data_stream/keyed.rb', line 86

def self.calculate_pk_ranges(db, table_name, num_partitions)
  key = Tapsoob::Utils.order_by(db, table_name).first
  ds = db[table_name.to_sym]

  # Get total row count
  total_rows = ds.count
  return [[ds.min(key) || 0, ds.max(key) || 0]] if total_rows == 0 || num_partitions <= 1

  # Calculate target rows per partition
  rows_per_partition = (total_rows.to_f / num_partitions).ceil

  # Find PK boundaries at percentiles using OFFSET
  # This ensures even distribution of ROWS, not PK values
  ranges = []
  (0...num_partitions).each do |i|
    # Calculate row offset for this partition's start
    start_offset = i * rows_per_partition
    end_offset = [(i + 1) * rows_per_partition - 1, total_rows - 1].min

    # Get the PK value at this row offset
    start_pk = ds.order(key).limit(1, start_offset).select(key).first
    start_pk = start_pk ? start_pk[key] : (ds.min(key) || 0)

    # Get the PK value at the end offset (or max for last partition)
    if i == num_partitions - 1
      end_pk = ds.max(key) || start_pk
    else
      end_pk_row = ds.order(key).limit(1, end_offset).select(key).first
      end_pk = end_pk_row ? end_pk_row[key] : start_pk
    end

    ranges << [start_pk, end_pk]
  end

  ranges
end

Instance Method Details

#buffer_limitObject



20
21
22
23
24
25
26
# File 'lib/tapsoob/data_stream/keyed.rb', line 20

def buffer_limit
  if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
    state[:last_fetched]
  else
    state[:filter]
  end
end

#calc_limit(chunksize) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/tapsoob/data_stream/keyed.rb', line 28

def calc_limit(chunksize)
  # we want to not fetch more than is needed while we're
  # inside sinatra but locally we can select more than
  # is strictly needed
  if defined?(Sinatra)
    (chunksize * 1.1).ceil
  else
    (chunksize * 3).ceil
  end
end

#fetch_buffered(chunksize) ⇒ Object



60
61
62
63
64
65
# File 'lib/tapsoob/data_stream/keyed.rb', line 60

def fetch_buffered(chunksize)
  load_buffer(chunksize) if buffer.size < chunksize
  rows = buffer.slice(0, chunksize)
  state[:last_fetched] = rows.any? ? rows.last[primary_key] : nil
  rows
end

#increment(row_count) ⇒ Object



67
68
69
70
# File 'lib/tapsoob/data_stream/keyed.rb', line 67

def increment(row_count)
  # pop the rows we just successfully sent off the buffer
  @buffer.slice!(0, row_count)
end

#load_buffer(chunksize) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tapsoob/data_stream/keyed.rb', line 39

def load_buffer(chunksize)
  num = 0
  loop do
    limit = calc_limit(chunksize)
    # we have to use local variables in order for the virtual row filter to work correctly
    key = primary_key
    buf_limit = buffer_limit
    ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
    log.debug "DataStream::Keyed#load_buffer SQL -> #{ds.sql}"
    data = ds.all
    self.buffer += data
    num += data.size
    if data.any?
      # keep a record of the last primary key value in the buffer
      state[:filter] = self.buffer.last[primary_key]
    end

    break if num >= chunksize || data.empty?
  end
end

#primary_keyObject



16
17
18
# File 'lib/tapsoob/data_stream/keyed.rb', line 16

def primary_key
  state[:primary_key].to_sym
end

#verify_streamObject



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/tapsoob/data_stream/keyed.rb', line 72

def verify_stream
  key = primary_key
  ds = table.order(*order_by)
  current_filter = ds.max(key.sql_number)

  # set the current filter to the max of the primary key
  state[:filter] = current_filter
  # clear out the last_fetched value so it can restart from scratch
  state[:last_fetched] = nil

  log.debug "DataStream::Keyed#verify_stream -> state: #{state.inspect}"
end