Class: Taps::DataStream

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/data_stream.rb

Direct Known Subclasses

DataStreamKeyed

Constant Summary collapse

DEFAULT_CHUNKSIZE =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, state) ⇒ DataStream

Returns a new instance of DataStream.



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/taps/data_stream.rb', line 15

def initialize(db, state)
  @db = db
  @state = {
    :offset => 0,
    :avg_chunksize => 0,
    :num_chunksize => 0,
    :total_chunksize => 0,
  }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @complete = false
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



13
14
15
# File 'lib/taps/data_stream.rb', line 13

def db
  @db
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/taps/data_stream.rb', line 13

def state
  @state
end

Class Method Details

.factory(db, state) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/taps/data_stream.rb', line 228

def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state)
  end

  if Taps::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

.parse_json(json) ⇒ Object



182
183
184
185
186
# File 'lib/taps/data_stream.rb', line 182

def self.parse_json(json)
  hash = OkJson.decode(json).symbolize_keys
  hash[:state].symbolize_keys! if hash.has_key?(:state)
  hash
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/taps/data_stream.rb', line 126

def complete?
  @complete
end

#encode_rows(rows) ⇒ Object



108
109
110
# File 'lib/taps/data_stream.rb', line 108

def encode_rows(rows)
  Taps::Utils.base64encode(Marshal.dump(rows))
end

#errorObject



35
36
37
# File 'lib/taps/data_stream.rb', line 35

def error
  state[:error] || false
end

#error=(val) ⇒ Object



31
32
33
# File 'lib/taps/data_stream.rb', line 31

def error=(val)
  state[:error] = val
end

#fetchObject



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/taps/data_stream.rb', line 112

def fetch
  log.debug "DataStream#fetch state -> #{state.inspect}"

  t1 = Time.now
  rows = fetch_rows
  encoded_data = encode_rows(rows)
  t2 = Time.now
  elapsed_time = t2 - t1

  @complete = rows == { }

  [encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time]
end

#fetch_chunksizeObject



93
94
95
96
97
98
99
# File 'lib/taps/data_stream.rb', line 93

def fetch_chunksize
  chunksize = state[:chunksize]
  return chunksize if state[:num_chunksize] < max_chunksize_training
  return chunksize if state[:avg_chunksize] == 0
  return chunksize if state[:error]
  state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize
end

#fetch_from_resource(resource, headers) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/taps/data_stream.rb', line 165

def fetch_from_resource(resource, headers)
  res = nil
  log.debug "DataStream#fetch_from_resource state -> #{state.inspect}"
  state[:chunksize] = Taps::Utils.calculate_chunksize(state[:chunksize]) do |c|
    state[:chunksize] = c.to_i
    res = resource.post({:state => OkJson.encode(self.to_hash)}, headers)
  end

  begin
    params = Taps::Multipart.parse(res)
    params[:json] = self.class.parse_json(params[:json]) if params.has_key?(:json)
    return params
  rescue OkJson::Parser
    raise Taps::CorruptedData.new("Invalid OkJson Received")
  end
end

#fetch_remote(resource, headers) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/taps/data_stream.rb', line 130

def fetch_remote(resource, headers)
  params = fetch_from_resource(resource, headers)
  encoded_data = params[:encoded_data]
  json = params[:json]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == { }

  # update local state
  state.merge!(json[:state].merge(:chunksize => state[:chunksize]))

  unless @complete
    import_rows(rows)
    rows[:data].size
  else
    0
  end
end

#fetch_remote_in_server(params) ⇒ Object

this one is used inside the server process



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/taps/data_stream.rb', line 150

def fetch_remote_in_server(params)
  json = self.class.parse_json(params[:json])
  encoded_data = params[:encoded_data]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == { }

  unless @complete
    import_rows(rows)
    rows[:data].size
  else
    0
  end
end

#fetch_rowsObject

keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/taps/data_stream.rb', line 76

def fetch_rows
  state[:chunksize] = fetch_chunksize
  ds = table.order(*order_by).limit(state[:chunksize], state[:offset])
  log.debug "DataStream#fetch_rows SQL -> #{ds.sql}"
  rows = Taps::Utils.format_data(ds.all,
    :string_columns => string_columns,
    :schema => db.schema(table_name),
    :table  => table_name
  )
  update_chunksize_stats
  rows
end

#import_rows(rows) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/taps/data_stream.rb', line 202

def import_rows(rows)
  table.import(rows[:header], rows[:data])
  state[:offset] += rows[:data].size
rescue Exception => ex
  case ex.message
  when /integer out of range/ then
    raise Taps::InvalidData, <<-ERROR, []
\nDetected integer data that exceeds the maximum allowable size for an integer type.
This generally occurs when importing from SQLite due to the fact that SQLite does
not enforce maximum values on integer types.
    ERROR
  else raise ex
  end
end

#increment(row_count) ⇒ Object



70
71
72
# File 'lib/taps/data_stream.rb', line 70

def increment(row_count)
  state[:offset] += row_count
end

#logObject



27
28
29
# File 'lib/taps/data_stream.rb', line 27

def log
  Taps.log
end

#max_chunksize_trainingObject



89
90
91
# File 'lib/taps/data_stream.rb', line 89

def max_chunksize_training
  20
end

#order_by(name = nil) ⇒ Object



63
64
65
66
67
68
# File 'lib/taps/data_stream.rb', line 63

def order_by(name=nil)
  @order_by ||= begin
    name ||= table_name
    Taps::Utils.order_by(db, name)
  end
end

#parse_encoded_data(encoded_data, checksum) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/taps/data_stream.rb', line 188

def parse_encoded_data(encoded_data, checksum)
  raise Taps::CorruptedData.new("Checksum Failed") unless Taps::Utils.valid_data?(encoded_data, checksum)

  begin
    return Marshal.load(Taps::Utils.base64decode(encoded_data))
  rescue Object => e
    unless ENV['NO_DUMP_MARSHAL_ERRORS']
      puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat"
      File.open("dump.#{Process.pid}.dat", "w") { |f| f.write(encoded_data) }
    end
    raise
  end
end

#string_columnsObject



55
56
57
# File 'lib/taps/data_stream.rb', line 55

def string_columns
  @string_columns ||= Taps::Utils.incorrect_blobs(db, table_name)
end

#tableObject



59
60
61
# File 'lib/taps/data_stream.rb', line 59

def table
  @table ||= db[table_name_sql]
end

#table_nameObject



39
40
41
# File 'lib/taps/data_stream.rb', line 39

def table_name
  state[:table_name].to_sym
end

#table_name_sqlObject



43
44
45
# File 'lib/taps/data_stream.rb', line 43

def table_name_sql
  table_name.identifier
end

#to_hashObject



47
48
49
# File 'lib/taps/data_stream.rb', line 47

def to_hash
  state.merge(:klass => self.class.to_s)
end

#to_jsonObject



51
52
53
# File 'lib/taps/data_stream.rb', line 51

def to_json
  OkJson.encode(to_hash)
end

#update_chunksize_statsObject



101
102
103
104
105
106
# File 'lib/taps/data_stream.rb', line 101

def update_chunksize_stats
  return if state[:num_chunksize] >= max_chunksize_training
  state[:total_chunksize] += state[:chunksize]
  state[:num_chunksize] += 1
  state[:avg_chunksize] = state[:total_chunksize] / state[:num_chunksize] rescue state[:chunksize]
end

#verify_remote_stream(resource, headers) ⇒ Object



221
222
223
224
225
226
# File 'lib/taps/data_stream.rb', line 221

def verify_remote_stream(resource, headers)
  json_raw = resource.post({:state => OkJson.encode(self)}, headers).to_s
  json = self.class.parse_json(json_raw)

  self.class.new(db, json[:state])
end

#verify_streamObject



217
218
219
# File 'lib/taps/data_stream.rb', line 217

def verify_stream
  state[:offset] = table.count
end