Class: Tapsoob::DataStream::Interleaved
- Inherits:
-
Base
- Object
- Base
- Tapsoob::DataStream::Interleaved
show all
- Defined in:
- lib/tapsoob/data_stream/interleaved.rb
Overview
DataStream variant for interleaved chunk-based partitioning (for tables without integer PK)
Constant Summary
Constants inherited
from Base
Base::DEFAULT_CHUNKSIZE
Instance Attribute Summary
Attributes inherited from Base
#db, #options, #state
Instance Method Summary
collapse
Methods inherited from Base
#encode_rows, #error, #error=, factory, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_stream
Constructor Details
#initialize(db, state, opts = {}) ⇒ Interleaved
Returns a new instance of Interleaved.
8
9
10
11
12
13
14
15
16
17
18
|
# File 'lib/tapsoob/data_stream/interleaved.rb', line 8
def initialize(db, state, opts = {})
super(db, state, opts)
@state = {
:worker_id => 0,
:num_workers => 1,
:chunk_number => 0
}.merge(@state)
end
|
Instance Method Details
#complete? ⇒ Boolean
75
76
77
|
# File 'lib/tapsoob/data_stream/interleaved.rb', line 75
def complete?
state[:offset] >= state[:size]
end
|
#fetch(opts = {}) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/tapsoob/data_stream/interleaved.rb', line 48
def fetch(opts = {})
opts = (opts.empty? ? { :type => "database", :source => db.uri } : opts)
log.debug "DataStream::Interleaved#fetch state -> #{state.inspect}"
t1 = Time.now
rows = (opts[:type] == "file" ? fetch_file(opts[:source]) : fetch_rows)
encoded_data = encode_rows(rows)
t2 = Time.now
elapsed_time = t2 - t1
row_count = (rows == {} ? 0 : rows[:data].size)
state[:chunk_number] += 1
state[:offset] += row_count
[encoded_data, row_count, elapsed_time]
end
|
#fetch_rows ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/tapsoob/data_stream/interleaved.rb', line 20
def fetch_rows
worker_id = state[:worker_id]
num_workers = state[:num_workers]
chunk_number = state[:chunk_number]
chunksize = state[:chunksize]
state[:size] ||= table.count
global_chunk_index = (chunk_number * num_workers) + worker_id
offset = global_chunk_index * chunksize
ds = table.order(*order_by).limit(chunksize, offset)
log.debug "DataStream::Interleaved#fetch_rows SQL -> #{ds.sql} (worker #{worker_id}/#{num_workers}, chunk #{chunk_number})"
rows = Tapsoob::Utils.format_data(db, ds.all,
:string_columns => string_columns,
:schema => db.schema(table_name),
:table => table_name
)
update_chunksize_stats
rows
end
|
#increment(row_count) ⇒ Object
69
70
71
72
73
|
# File 'lib/tapsoob/data_stream/interleaved.rb', line 69
def increment(row_count)
state[:chunk_number] += 1
state[:offset] += row_count
end
|