Class: Taps::Pull

Inherits:
Operation show all
Defined in:
lib/taps/operation.rb

Instance Attribute Summary

Attributes inherited from Operation

#database_url, #opts, #remote_url, #session_uri

Instance Method Summary collapse

Methods inherited from Operation

#apply_table_filter, #close_session, #completed_tables, #compression_disabled?, #db, #default_chunksize, #exiting?, factory, #format_number, #http_headers, #indexes_first?, #initialize, #log, #resuming?, #safe_database_url, #safe_remote_url, #safe_url, #server, #session_resource, #set_session, #setup_signal_trap, #store_session, #stream_state, #stream_state=, #table_filter, #verify_server

Constructor Details

This class inherits a constructor from Taps::Operation

Instance Method Details

#fetch_remote_tables_infoObject



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/taps/operation.rb', line 321

def fetch_remote_tables_info
	retries = 0
	max_retries = 10
	begin
		tables = JSON.load(session_resource['pull/table_names'].get(http_headers).to_s)
	rescue RestClient::Exception
		retries += 1
		retry if retries <= max_retries
		puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
		exit(1)
	end

	data = {}
	apply_table_filter(tables).each do |table_name|
		retries = 0
		begin
			count = session_resource['pull/table_count'].post({:table => table_name}, http_headers).to_s.to_i
			data[table_name] = count
		rescue RestClient::Exception
			retries += 1
			retry if retries <= max_retries
			puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
			exit(1)
		end
	end
	data
end

#file_prefixObject



203
204
205
# File 'lib/taps/operation.rb', line 203

def file_prefix
	"pull"
end

#pull_dataObject



252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/taps/operation.rb', line 252

def pull_data
	puts "Receiving data"

	puts "#{tables.size} tables, #{format_number(record_count)} records"

	tables.each do |table_name, count|
		progress = ProgressBar.new(table_name.to_s, count)
		stream = Taps::DataStream.factory(db, {
			:chunksize => default_chunksize,
			:table_name => table_name
		})
		pull_data_from_table(stream, progress)
	end
end

#pull_data_from_table(stream, progress) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/taps/operation.rb', line 279

def pull_data_from_table(stream, progress)
	loop do
		begin
			if exiting?
				store_session
				exit 0
			end

			size = stream.fetch_remote(session_resource['pull/table'], http_headers)
			break if stream.complete?
			progress.inc(size) unless exiting?
			stream.error = false
			self.stream_state = stream.to_hash
		rescue DataStream::CorruptedData => e
			puts "Corrupted Data Received #{e.message}, retrying..."
			stream.error = true
			next
		end
	end

	progress.finish
	completed_tables << stream.table_name.to_s
	self.stream_state = {}
end

#pull_indexesObject



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/taps/operation.rb', line 349

def pull_indexes
	puts "Receiving indexes"

	idxs = JSON.parse(session_resource['pull/indexes'].get(http_headers).to_s)

	apply_table_filter(idxs).each do |table, indexes|
		next unless indexes.size > 0
		progress = ProgressBar.new(table, indexes.size)
		indexes.each do |idx|
			output = Taps::Utils.load_indexes(database_url, idx)
			puts output if output
			progress.inc(1)
		end
		progress.finish
	end
end

#pull_partial_dataObject



267
268
269
270
271
272
273
274
275
276
277
# File 'lib/taps/operation.rb', line 267

def pull_partial_data
	return if stream_state == {}

	table_name = stream_state[:table_name]
	record_count = tables[table_name.to_s]
	puts "Resuming #{table_name}, #{format_number(record_count)} records"

	progress = ProgressBar.new(table_name.to_s, record_count)
	stream = Taps::DataStream.factory(db, stream_state)
	pull_data_from_table(stream, progress)
end

#pull_reset_sequencesObject



366
367
368
369
370
371
# File 'lib/taps/operation.rb', line 366

def pull_reset_sequences
	puts "Resetting sequences"

	output = Taps::Utils.schema_bin(:reset_db_sequences, database_url)
	puts output if output
end

#pull_schemaObject



238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/taps/operation.rb', line 238

def pull_schema
	puts "Receiving schema"

	progress = ProgressBar.new('Schema', tables.size)
	tables.each do |table_name, count|
		schema_data = session_resource['pull/schema'].post({:table_name => table_name}, http_headers).to_s
		log.debug "Table: #{table_name}\n#{schema_data}\n"
		output = Taps::Utils.load_schema(database_url, schema_data)
		puts output if output
		progress.inc(1)
	end
	progress.finish
end

#record_countObject



313
314
315
# File 'lib/taps/operation.rb', line 313

def record_count
	@record_count ||= remote_tables_info.values.inject(0) { |a,c| a += c }
end

#remote_tables_infoObject



317
318
319
# File 'lib/taps/operation.rb', line 317

def remote_tables_info
	opts[:remote_tables_info] ||= fetch_remote_tables_info
end

#runObject



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/taps/operation.rb', line 211

def run
	verify_server

	begin
		unless resuming?
			pull_schema
			pull_indexes if indexes_first?
		end
		setup_signal_trap
		pull_partial_data if resuming?
		pull_data
		pull_indexes unless indexes_first?
		pull_reset_sequences
		close_session
	rescue RestClient::Exception => e
		store_session
		if e.respond_to?(:response)
			puts "!!! Caught Server Exception"
			puts "HTTP CODE: #{e.http_code}"
			puts "#{e.response.to_s}"
			exit(1)
		else
			raise
		end
	end
end

#tablesObject



304
305
306
307
308
309
310
311
# File 'lib/taps/operation.rb', line 304

def tables
	h = {}
	remote_tables_info.each do |table_name, count|
		next if completed_tables.include?(table_name.to_s)
		h[table_name.to_s] = count
	end
	h
end

#to_hashObject



207
208
209
# File 'lib/taps/operation.rb', line 207

def to_hash
	super.merge(:remote_tables_info => remote_tables_info)
end