Class: Taps::DataStream

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/data_stream.rb

Direct Known Subclasses

DataStreamKeyed

Constant Summary collapse

DEFAULT_CHUNKSIZE =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, state) ⇒ DataStream

Returns a new instance of DataStream.



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/taps/data_stream.rb', line 15

def initialize(db, state)
  @db = db
  @state = {
    :offset => 0,
    :avg_chunksize => 0,
    :num_chunksize => 0,
    :total_chunksize => 0,
  }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @complete = false
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



13
14
15
# File 'lib/taps/data_stream.rb', line 13

def db
  @db
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/taps/data_stream.rb', line 13

def state
  @state
end

Class Method Details

.factory(db, state) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/taps/data_stream.rb', line 218

def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state)
  end

  if Taps::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

.parse_json(json) ⇒ Object



182
183
184
185
186
# File 'lib/taps/data_stream.rb', line 182

def self.parse_json(json)
  hash = JSON.parse(json).symbolize_keys
  hash[:state].symbolize_keys! if hash.has_key?(:state)
  hash
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/taps/data_stream.rb', line 126

def complete?
  @complete
end

#encode_rows(rows) ⇒ Object



108
109
110
# File 'lib/taps/data_stream.rb', line 108

def encode_rows(rows)
  Taps::Utils.base64encode(Marshal.dump(rows))
end

#errorObject



35
36
37
# File 'lib/taps/data_stream.rb', line 35

def error
  state[:error] || false
end

#error=(val) ⇒ Object



31
32
33
# File 'lib/taps/data_stream.rb', line 31

def error=(val)
  state[:error] = val
end

#fetchObject



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/taps/data_stream.rb', line 112

def fetch
  log.debug "DataStream#fetch state -> #{state.inspect}"

  t1 = Time.now
  rows = fetch_rows
  encoded_data = encode_rows(rows)
  t2 = Time.now
  elapsed_time = t2 - t1

  @complete = rows == { }

  [encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time]
end

#fetch_chunksizeObject



93
94
95
96
97
98
99
# File 'lib/taps/data_stream.rb', line 93

def fetch_chunksize
  chunksize = state[:chunksize]
  return chunksize if state[:num_chunksize] < max_chunksize_training
  return chunksize if state[:avg_chunksize] == 0
  return chunksize if state[:error]
  state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize
end

#fetch_from_resource(resource, headers) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/taps/data_stream.rb', line 165

def fetch_from_resource(resource, headers)
  res = nil
  log.debug "DataStream#fetch_from_resource state -> #{state.inspect}"
  state[:chunksize] = Taps::Utils.calculate_chunksize(state[:chunksize]) do |c|
    state[:chunksize] = c
    res = resource.post({:state => self.to_json}, headers)
  end

  begin
    params = Taps::Multipart.parse(res)
    params[:json] = self.class.parse_json(params[:json]) if params.has_key?(:json)
    return params
  rescue JSON::Parser
    raise Taps::CorruptedData.new("Invalid JSON Received")
  end
end

#fetch_remote(resource, headers) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/taps/data_stream.rb', line 130

def fetch_remote(resource, headers)
  params = fetch_from_resource(resource, headers)
  encoded_data = params[:encoded_data]
  json = params[:json]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == { }

  # update local state
  state.merge!(json[:state].merge(:chunksize => state[:chunksize]))

  unless @complete
    import_rows(rows)
    rows[:data].size
  else
    0
  end
end

#fetch_remote_in_server(params) ⇒ Object

this one is used inside the server process



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/taps/data_stream.rb', line 150

def fetch_remote_in_server(params)
  json = self.class.parse_json(params[:json])
  encoded_data = params[:encoded_data]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == { }

  unless @complete
    import_rows(rows)
    rows[:data].size
  else
    0
  end
end

#fetch_rowsObject

keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/taps/data_stream.rb', line 76

def fetch_rows
  state[:chunksize] = fetch_chunksize
  ds = table.order(*order_by).limit(state[:chunksize], state[:offset])
  log.debug "DataStream#fetch_rows SQL -> #{ds.sql}"
  rows = Taps::Utils.format_data(ds.all,
    :string_columns => string_columns,
    :schema => db.schema(table_name),
    :table  => table_name
  )
  update_chunksize_stats
  rows
end

#import_rows(rows) ⇒ Object



202
203
204
205
# File 'lib/taps/data_stream.rb', line 202

def import_rows(rows)
  table.import(rows[:header], rows[:data])
  state[:offset] += rows[:data].size
end

#increment(row_count) ⇒ Object



70
71
72
# File 'lib/taps/data_stream.rb', line 70

def increment(row_count)
  state[:offset] += row_count
end

#logObject



27
28
29
# File 'lib/taps/data_stream.rb', line 27

def log
  Taps.log
end

#max_chunksize_trainingObject



89
90
91
# File 'lib/taps/data_stream.rb', line 89

def max_chunksize_training
  20
end

#order_by(name = nil) ⇒ Object



63
64
65
66
67
68
# File 'lib/taps/data_stream.rb', line 63

def order_by(name=nil)
  @order_by ||= begin
    name ||= table_name
    Taps::Utils.order_by(db, name)
  end
end

#parse_encoded_data(encoded_data, checksum) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/taps/data_stream.rb', line 188

def parse_encoded_data(encoded_data, checksum)
  raise Taps::CorruptedData.new("Checksum Failed") unless Taps::Utils.valid_data?(encoded_data, checksum)

  begin
    return Marshal.load(Taps::Utils.base64decode(encoded_data))
  rescue Object => e
    unless ENV['NO_DUMP_MARSHAL_ERRORS']
      puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat"
      File.open("dump.#{Process.pid}.dat", "w") { |f| f.write(encoded_data) }
    end
    raise
  end
end

#string_columnsObject



55
56
57
# File 'lib/taps/data_stream.rb', line 55

def string_columns
  @string_columns ||= Taps::Utils.incorrect_blobs(db, table_name)
end

#tableObject



59
60
61
# File 'lib/taps/data_stream.rb', line 59

def table
  @table ||= db[table_name_sql]
end

#table_nameObject



39
40
41
# File 'lib/taps/data_stream.rb', line 39

def table_name
  state[:table_name].to_sym
end

#table_name_sqlObject



43
44
45
# File 'lib/taps/data_stream.rb', line 43

def table_name_sql
  table_name.identifier
end

#to_hashObject



47
48
49
# File 'lib/taps/data_stream.rb', line 47

def to_hash
  state.merge(:klass => self.class.to_s)
end

#to_jsonObject



51
52
53
# File 'lib/taps/data_stream.rb', line 51

def to_json
  to_hash.to_json
end

#update_chunksize_statsObject



101
102
103
104
105
106
# File 'lib/taps/data_stream.rb', line 101

def update_chunksize_stats
  return if state[:num_chunksize] >= max_chunksize_training
  state[:total_chunksize] += state[:chunksize]
  state[:num_chunksize] += 1
  state[:avg_chunksize] = state[:total_chunksize] / state[:num_chunksize] rescue state[:chunksize]
end

#verify_remote_stream(resource, headers) ⇒ Object



211
212
213
214
215
216
# File 'lib/taps/data_stream.rb', line 211

def verify_remote_stream(resource, headers)
  json_raw = resource.post({:state => self.to_json}, headers).to_s
  json = self.class.parse_json(json_raw)

  self.class.new(db, json[:state])
end

#verify_streamObject



207
208
209
# File 'lib/taps/data_stream.rb', line 207

def verify_stream
  state[:offset] = table.count
end