Class: Tapsoob::DataStream::Interleaved

Inherits:
Base
  • Object
show all
Defined in:
lib/tapsoob/data_stream/interleaved.rb

Overview

DataStream variant for interleaved chunk-based partitioning (for tables without integer PK)

Constant Summary

Constants inherited from Base

Base::DEFAULT_CHUNKSIZE

Instance Attribute Summary

Attributes inherited from Base

#db, #options, #state

Instance Method Summary collapse

Methods inherited from Base

#encode_rows, #error, #error=, factory, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_stream

Constructor Details

#initialize(db, state, opts = {}) ⇒ Interleaved

Returns a new instance of Interleaved.



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/tapsoob/data_stream/interleaved.rb', line 8

def initialize(db, state, opts = {})
  super(db, state, opts)
  # :worker_id = which worker this is (0-indexed)
  # :num_workers = total number of workers
  # :chunk_number = current chunk number for this worker
  @state = {
    :worker_id => 0,
    :num_workers => 1,
    :chunk_number => 0
  }.merge(@state)
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/tapsoob/data_stream/interleaved.rb', line 75

def complete?
  state[:offset] >= state[:size]
end

#fetch(opts = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/tapsoob/data_stream/interleaved.rb', line 48

def fetch(opts = {})
  opts = (opts.empty? ? { :type => "database", :source => db.uri } : opts)

  log.debug "DataStream::Interleaved#fetch state -> #{state.inspect}"

  t1 = Time.now
  rows = (opts[:type] == "file" ? fetch_file(opts[:source]) : fetch_rows)
  encoded_data = encode_rows(rows)
  t2 = Time.now
  elapsed_time = t2 - t1

  row_count = (rows == {} ? 0 : rows[:data].size)

  # Always increment chunk number to avoid infinite loops
  # Even if we got 0 rows, move to the next chunk position
  state[:chunk_number] += 1
  state[:offset] += row_count

  [encoded_data, row_count, elapsed_time]
end

#fetch_rowsObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/tapsoob/data_stream/interleaved.rb', line 20

def fetch_rows
  worker_id = state[:worker_id]
  num_workers = state[:num_workers]
  chunk_number = state[:chunk_number]
  chunksize = state[:chunksize]

  # Only count once on first fetch
  state[:size] ||= table.count

  # Calculate which global chunk this worker should fetch
  # Worker 0: chunks 0, num_workers, 2*num_workers, ...
  # Worker 1: chunks 1, num_workers+1, 2*num_workers+1, ...
  global_chunk_index = (chunk_number * num_workers) + worker_id
  offset = global_chunk_index * chunksize

  ds = table.order(*order_by).limit(chunksize, offset)
  log.debug "DataStream::Interleaved#fetch_rows SQL -> #{ds.sql} (worker #{worker_id}/#{num_workers}, chunk #{chunk_number})"

  rows = Tapsoob::Utils.format_data(db, ds.all,
    :string_columns => string_columns,
    :schema => db.schema(table_name),
    :table => table_name
  )

  update_chunksize_stats
  rows
end

#increment(row_count) ⇒ Object



69
70
71
72
73
# File 'lib/tapsoob/data_stream/interleaved.rb', line 69

def increment(row_count)
  # This is called by the old code path - not used in new parallel implementation
  state[:chunk_number] += 1
  state[:offset] += row_count
end