Class: Tapsoob::Pull

Inherits:
Operation show all
Defined in:
lib/tapsoob/operation.rb

Instance Attribute Summary

Attributes inherited from Operation

#database_url, #dump_path, #opts

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Operation

#apply_table_filter, #catch_errors, #completed_tables, #db, #default_chunksize, #exclude_tables, #exiting?, #format_number, #indexes_first?, #initialize, #log, #resuming?, #setup_signal_trap, #skip_schema?, #store_session, #stream_state, #stream_state=, #table_filter

Constructor Details

This class inherits a constructor from Tapsoob::Operation

Class Method Details

.factory(db, state) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/tapsoob/operation.rb', line 273

def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state)
  end

  if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

Instance Method Details

#fetch_tables_infoObject



263
264
265
266
267
268
269
270
271
# File 'lib/tapsoob/operation.rb', line 263

def fetch_tables_info
  tables = db.tables

  data = {}
  apply_table_filter(tables).each do |table_name|
    data[table_name] = db[table_name].count
  end
  data
end

#file_prefixObject



155
156
157
# File 'lib/tapsoob/operation.rb', line 155

def file_prefix
  "pull"
end

#pull_dataObject



191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/tapsoob/operation.rb', line 191

def pull_data
  log.info "Receiving data"

  log.info "#{tables.size} tables, #{format_number(record_count)} records"

  tables.each do |table_name, count|
    progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, count) : nil)
    stream   = Tapsoob::DataStream.factory(db, {
      :chunksize  => default_chunksize,
      :table_name => table_name
    })
    pull_data_from_table(stream, progress)
  end
end

#pull_data_from_table(stream, progress) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/tapsoob/operation.rb', line 218

def pull_data_from_table(stream, progress)
  loop do
    begin
      exit 0 if exiting?

      size = stream.fetch_database do |rows|
        if dump_path.nil?
          puts JSON.generate(rows)
        else
          Tapsoob::Utils.export_rows(dump_path, stream.table_name, rows)
        end
      end
      break if stream.complete?
      progress.inc(size) if progress && !exiting?
      stream.error = false
      self.stream_state = stream.to_hash
    rescue Tapsoob::CorruptedData => e
      log.info "Corrupted Data Received #{e.message}, retrying..."
      stream.error = true
      next
    end
  end

  progress.finish if progress
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end

#pull_indexesObject



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/tapsoob/operation.rb', line 289

def pull_indexes
  log.info "Receiving indexes"

  raw_idxs = Tapsoob::Utils.schema_bin(:indexes_individual, database_url)
  idxs     = (raw_idxs && raw_idxs.length >= 2 ? JSON.parse(raw_idxs) : {})

  apply_table_filter(idxs).each do |table, indexes|
    next unless indexes.size > 0
    progress = ProgressBar.new(table, indexes.size)
    indexes.each do |idx|
      output = Tapsoob::Utils.export_indexes(dump_path, table, idx)
      puts output if output
      progress.inc(1)
    end
    progress.finish
  end
end

#pull_partial_dataObject



206
207
208
209
210
211
212
213
214
215
216
# File 'lib/tapsoob/operation.rb', line 206

def pull_partial_data
  return if stream_state == {}

  table_name = stream_state[:table_name]
  record_count = tables[table_name.to_s]
  log.info "Resuming #{table_name}, #{format_number(record_count)} records"

  progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, record_count) : nil)
  stream = Tapsoob::DataStream.factory(db, stream_state)
  pull_data_from_table(stream, progress)
end

#pull_reset_sequencesObject



307
308
309
310
311
312
# File 'lib/tapsoob/operation.rb', line 307

def pull_reset_sequences
  log.info "Resetting sequences"

  output = Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url)
  puts output if output
end

#pull_schemaObject



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/tapsoob/operation.rb', line 177

def pull_schema
  log.info "Receiving schema"

  progress = ProgressBar.new('Schema', tables.size)
  tables.each do |table_name, count|
    schema_data = Tapsoob::Schema.dump_table(database_url, table_name)
    log.debug "Table: #{table_name}\n#{schema_data}\n"
    output = Tapsoob::Utils.export_schema(dump_path, table_name, schema_data)
    puts output if output
    progress.inc(1)
  end
  progress.finish
end

#record_countObject



255
256
257
# File 'lib/tapsoob/operation.rb', line 255

def record_count
  tables_info.values.inject(:+)
end

#runObject



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/tapsoob/operation.rb', line 163

def run
  catch_errors do
    unless resuming?
      pull_schema if !skip_schema?
      pull_indexes if indexes_first? && !skip_schema?
    end
    setup_signal_trap
    pull_partial_data if resuming?
    pull_data
    pull_indexes if !indexes_first? && !skip_schema?
    pull_reset_sequences
  end
end

#tablesObject



246
247
248
249
250
251
252
253
# File 'lib/tapsoob/operation.rb', line 246

def tables
  h = {}
  tables_info.each do |table_name, count|
    next if completed_tables.include?(table_name.to_s)
    h[table_name.to_s] = count
  end
  h
end

#tables_infoObject



259
260
261
# File 'lib/tapsoob/operation.rb', line 259

def tables_info
  opts[:tables_info] ||= fetch_tables_info
end

#to_hashObject



159
160
161
# File 'lib/tapsoob/operation.rb', line 159

def to_hash
  super.merge(:remote_tables_info => remote_tables_info)
end