Class: Tapsoob::DataStream::Keyed
- Inherits:
-
Base
- Object
- Base
- Tapsoob::DataStream::Keyed
show all
- Defined in:
- lib/tapsoob/data_stream/keyed.rb
Constant Summary
Constants inherited
from Base
Base::DEFAULT_CHUNKSIZE
Instance Attribute Summary collapse
Attributes inherited from Base
#db, #options, #state
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Base
#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #fetch_rows, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats
Constructor Details
#initialize(db, state, opts = {}) ⇒ Keyed
Returns a new instance of Keyed.
9
10
11
12
13
14
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 9
def initialize(db, state, opts = {})
super(db, state, opts)
@state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state)
@state[:chunksize] ||= DEFAULT_CHUNKSIZE
@buffer = []
end
|
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
7
8
9
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 7
def buffer
@buffer
end
|
Class Method Details
.calculate_pk_ranges(db, table_name, num_partitions) ⇒ Object
Calculate PK range for partitioning
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 86
def self.calculate_pk_ranges(db, table_name, num_partitions)
key = Tapsoob::Utils.order_by(db, table_name).first
ds = db[table_name.to_sym]
total_rows = ds.count
return [[ds.min(key) || 0, ds.max(key) || 0]] if total_rows == 0 || num_partitions <= 1
rows_per_partition = (total_rows.to_f / num_partitions).ceil
ranges = []
(0...num_partitions).each do |i|
start_offset = i * rows_per_partition
end_offset = [(i + 1) * rows_per_partition - 1, total_rows - 1].min
start_pk = ds.order(key).limit(1, start_offset).select(key).first
start_pk = start_pk ? start_pk[key] : (ds.min(key) || 0)
if i == num_partitions - 1
end_pk = ds.max(key) || start_pk
else
end_pk_row = ds.order(key).limit(1, end_offset).select(key).first
end_pk = end_pk_row ? end_pk_row[key] : start_pk
end
ranges << [start_pk, end_pk]
end
ranges
end
|
Instance Method Details
#buffer_limit ⇒ Object
20
21
22
23
24
25
26
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 20
def buffer_limit
if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
state[:last_fetched]
else
state[:filter]
end
end
|
#calc_limit(chunksize) ⇒ Object
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 28
def calc_limit(chunksize)
if defined?(Sinatra)
(chunksize * 1.1).ceil
else
(chunksize * 3).ceil
end
end
|
#fetch_buffered(chunksize) ⇒ Object
60
61
62
63
64
65
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 60
def fetch_buffered(chunksize)
load_buffer(chunksize) if buffer.size < chunksize
rows = buffer.slice(0, chunksize)
state[:last_fetched] = rows.any? ? rows.last[primary_key] : nil
rows
end
|
#increment(row_count) ⇒ Object
67
68
69
70
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 67
def increment(row_count)
@buffer.slice!(0, row_count)
end
|
#load_buffer(chunksize) ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 39
def load_buffer(chunksize)
num = 0
loop do
limit = calc_limit(chunksize)
key = primary_key
buf_limit = buffer_limit
ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
log.debug "DataStream::Keyed#load_buffer SQL -> #{ds.sql}"
data = ds.all
self.buffer += data
num += data.size
if data.any?
state[:filter] = self.buffer.last[primary_key]
end
break if num >= chunksize || data.empty?
end
end
|
#primary_key ⇒ Object
16
17
18
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 16
def primary_key
state[:primary_key].to_sym
end
|
#verify_stream ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/tapsoob/data_stream/keyed.rb', line 72
def verify_stream
key = primary_key
ds = table.order(*order_by)
current_filter = ds.max(key.sql_number)
state[:filter] = current_filter
state[:last_fetched] = nil
log.debug "DataStream::Keyed#verify_stream -> state: #{state.inspect}"
end
|