Class: Myreplicator::Transporter
- Inherits:
-
Object
- Object
- Myreplicator::Transporter
- Defined in:
- lib/transporter/transporter.rb
Class Method Summary collapse
-
.completed_files(ssh, export) ⇒ Object
Gets all files ready to be exported from server.
-
.download(export) ⇒ Object
Connect to server via SSH Kicks off parallel download.
-
.download_file ⇒ Object
Code block that each thread calls instance_exec is used to execute under Transporter class 1.
-
.get_done_files(export) ⇒ Object
Command for list of done files Grep -s used to supress error messages.
-
.get_dump_path(json_path, metadata = nil) ⇒ Object
Reads metadata file for the export path.
-
.junk_file?(metadata) ⇒ Boolean
Returns true if the file should be deleted.
- .metadata_obj(json_path) ⇒ Object
-
.parallel_download(export, ssh, files) ⇒ Object
Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel.
-
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers.
-
.remote_path(export, filename) ⇒ Object
Returns where path of dump files on remote server.
- .remove!(ssh, json_file, dump_file) ⇒ Object
-
.transfer ⇒ Object
Connects to all unique database servers downloads export files concurrently from multiple sources.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Transporter
constructor
A new instance of Transporter.
-
#schedule(cron) ⇒ Object
Schedules the transport job in Resque.
- #tmp_dir ⇒ Object
Constructor Details
#initialize(*args) ⇒ Transporter
Returns a new instance of Transporter.
8 9 10 |
# File 'lib/transporter/transporter.rb', line 8 def initialize *args = args. end |
Class Method Details
.completed_files(ssh, export) ⇒ Object
Gets all files ready to be exported from server
137 138 139 140 141 142 143 144 145 |
# File 'lib/transporter/transporter.rb', line 137 def self.completed_files ssh, export done_files = ssh.exec!(get_done_files(export)) unless done_files.blank? return done_files.split("\n") end return [] end |
.download(export) ⇒ Object
Connect to server via SSH Kicks off parallel download
53 54 55 56 |
# File 'lib/transporter/transporter.rb', line 53 def self.download export ssh = export.ssh_to_source parallel_download(export, ssh, completed_files(ssh, export)) end |
.download_file ⇒ Object
Code block that each thread calls instance_exec is used to execute under Transporter class
-
Connects via SFTP
-
Downloads metadata file first
-
Gets dump file location from metadata
-
Downloads dump file
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/transporter/transporter.rb', line 81 def self.download_file proc = Proc.new { |params| ssh = params[0] export = params[1] filename = params[2] ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! Log.run(:job_type => "transporter", :name => "metadata_file", :file => filename, :export_id => export.id) do |log| sftp = export.sftp_to_source json_file = Transporter.remote_path(export, filename) json_local_path = File.join(tmp_dir,filename) puts "Downloading #{json_file}" sftp.download!(json_file, json_local_path) = Transporter.(json_local_path) dump_file = .export_path puts .state if .state == "export_completed" Log.run(:job_type => "transporter", :name => "export_file", :file => dump_file, :export_id => export.id) do |log| puts "Downloading #{dump_file}" sftp.download!(dump_file, File.join(tmp_dir, dump_file.split("/").last)) Transporter.remove!(ssh, json_file, dump_file) end elsif Transporter.junk_file?() Transporter.remove!(ssh, json_file, dump_file) end #if end } end |
.get_done_files(export) ⇒ Object
Command for list of done files Grep -s used to supress error messages
171 172 173 |
# File 'lib/transporter/transporter.rb', line 171 def self.get_done_files export cmd = "cd #{Myreplicator.configs[export.source_schema]["ssh_tmp_dir"]}; grep -ls export_completed *.json" end |
.get_dump_path(json_path, metadata = nil) ⇒ Object
Reads metadata file for the export path
155 156 157 158 |
# File 'lib/transporter/transporter.rb', line 155 def self.get_dump_path json_path, = nil = Transporter.(json_path) if .nil? return .export_path end |
.junk_file?(metadata) ⇒ Boolean
Returns true if the file should be deleted
119 120 121 122 123 124 125 126 127 |
# File 'lib/transporter/transporter.rb', line 119 def self.junk_file? case .state when "failed" return true when "ignored" return true end return false end |
.metadata_obj(json_path) ⇒ Object
147 148 149 150 |
# File 'lib/transporter/transporter.rb', line 147 def self. json_path = ExportMetadata.new(:metadata_path => json_path) return end |
.parallel_download(export, ssh, files) ⇒ Object
Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/transporter/transporter.rb', line 62 def self.parallel_download export, ssh, files p = Parallelizer.new(:klass => "Myreplicator::Transporter") files.each do |filename| puts filename p.queue << {:params =>[ssh, export, filename], :block => download_file} end p.run end |
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers
22 23 24 |
# File 'lib/transporter/transporter.rb', line 22 def self.perform transfer # Kick off the load process end |
.remote_path(export, filename) ⇒ Object
Returns where path of dump files on remote server
163 164 165 |
# File 'lib/transporter/transporter.rb', line 163 def self.remote_path export, filename File.join(Myreplicator.configs[export.source_schema]["ssh_tmp_dir"], filename) end |
.remove!(ssh, json_file, dump_file) ⇒ Object
129 130 131 132 |
# File 'lib/transporter/transporter.rb', line 129 def self.remove! ssh, json_file, dump_file ssh.exec!("rm #{json_file}") ssh.exec!("rm #{dump_file}") end |
.transfer ⇒ Object
Connects to all unique database servers downloads export files concurrently from multiple sources
41 42 43 44 45 46 47 |
# File 'lib/transporter/transporter.rb', line 41 def self.transfer unique_jobs = Export.where("active = 1").group("source_schema") unique_jobs.each do |export| download export end end |
Instance Method Details
#schedule(cron) ⇒ Object
Schedules the transport job in Resque
29 30 31 32 33 34 35 |
# File 'lib/transporter/transporter.rb', line 29 def schedule cron Resque.set_schedule("myreplicator_transporter", { :cron => cron, :class => "Myreplicator::Transporter", :queue => "myreplicator_transporter" }) end |
#tmp_dir ⇒ Object
12 13 14 15 16 |
# File 'lib/transporter/transporter.rb', line 12 def tmp_dir @tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator") Dir.mkdir(@tmp_dir) unless File.directory?(@tmp_dir) @tmp_dir end |