Class: Myreplicator::Transporter

Inherits:
Object
  • Object
show all
Defined in:
lib/transporter/transporter.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Transporter

Returns a new instance of Transporter.



8
9
10
# File 'lib/transporter/transporter.rb', line 8

def initialize *args
  options = args.extract_options!
end

Class Method Details

.completed_files(ssh, export) ⇒ Object

Gets all files ready to be exported from server



137
138
139
140
141
142
143
144
145
# File 'lib/transporter/transporter.rb', line 137

def self.completed_files ssh, export
  done_files = ssh.exec!(get_done_files(export))

  unless done_files.blank?
    return done_files.split("\n")
  end

  return []
end

.download(export) ⇒ Object

Connect to server via SSH Kicks off parallel download



53
54
55
56
# File 'lib/transporter/transporter.rb', line 53

def self.download export
  ssh = export.ssh_to_source     
  parallel_download(export, ssh, completed_files(ssh, export))
end

.download_fileObject

Code block that each thread calls instance_exec is used to execute under Transporter class

  1. Connects via SFTP

  2. Downloads metadata file first

  3. Gets dump file location from metadata

  4. Downloads dump file



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/transporter/transporter.rb', line 81

def self.download_file    
  proc = Proc.new { |params|
    ssh = params[0]
    export = params[1] 
    filename = params[2]

    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!

    Log.run(:job_type => "transporter", :name => "metadata_file", 
            :file => filename, :export_id => export.id) do |log|

      sftp = export.sftp_to_source
      json_file = Transporter.remote_path(export, filename) 
      json_local_path = File.join(tmp_dir,filename)
      puts "Downloading #{json_file}"
      sftp.download!(json_file, json_local_path)
       = Transporter.(json_local_path)
      dump_file = .export_path
      puts .state
      if .state == "export_completed"
        Log.run(:job_type => "transporter", :name => "export_file",
                :file => dump_file, :export_id => export.id) do |log|
          puts "Downloading #{dump_file}"
          sftp.download!(dump_file, File.join(tmp_dir, dump_file.split("/").last))
          Transporter.remove!(ssh, json_file, dump_file)
        end
      elsif Transporter.junk_file?()
        Transporter.remove!(ssh, json_file, dump_file)
      end #if

    end
  }
end

.get_done_files(export) ⇒ Object

Command for list of done files Grep -s used to supress error messages



171
172
173
# File 'lib/transporter/transporter.rb', line 171

def self.get_done_files export
  cmd = "cd #{Myreplicator.configs[export.source_schema]["ssh_tmp_dir"]}; grep -ls export_completed *.json"
end

.get_dump_path(json_path, metadata = nil) ⇒ Object

Reads metadata file for the export path



155
156
157
158
# File 'lib/transporter/transporter.rb', line 155

def self.get_dump_path json_path,  = nil
   = Transporter.(json_path) if .nil?
  return .export_path
end

.junk_file?(metadata) ⇒ Boolean

Returns true if the file should be deleted

Returns:

  • (Boolean)


119
120
121
122
123
124
125
126
127
# File 'lib/transporter/transporter.rb', line 119

def self.junk_file? 
  case .state
  when "failed"
    return true
  when "ignored"
    return true
  end
  return false
end

.metadata_obj(json_path) ⇒ Object



147
148
149
150
# File 'lib/transporter/transporter.rb', line 147

def self. json_path
   = .new(:metadata_path => json_path)
  return 
end

.parallel_download(export, ssh, files) ⇒ Object

Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel



62
63
64
65
66
67
68
69
70
71
# File 'lib/transporter/transporter.rb', line 62

def self.parallel_download export, ssh, files    
  p = Parallelizer.new(:klass => "Myreplicator::Transporter")

  files.each do |filename|
    puts filename
    p.queue << {:params =>[ssh, export, filename], :block => download_file}
  end

  p.run 
end

.performObject

Main method provided for resque Reconnection provided for resque workers



22
23
24
# File 'lib/transporter/transporter.rb', line 22

def self.perform
  transfer # Kick off the load process
end

.remote_path(export, filename) ⇒ Object

Returns where path of dump files on remote server



163
164
165
# File 'lib/transporter/transporter.rb', line 163

def self.remote_path export, filename
  File.join(Myreplicator.configs[export.source_schema]["ssh_tmp_dir"], filename)
end

.remove!(ssh, json_file, dump_file) ⇒ Object



129
130
131
132
# File 'lib/transporter/transporter.rb', line 129

def self.remove! ssh, json_file, dump_file
  ssh.exec!("rm #{json_file}")
  ssh.exec!("rm #{dump_file}")           
end

.transferObject

Connects to all unique database servers downloads export files concurrently from multiple sources



41
42
43
44
45
46
47
# File 'lib/transporter/transporter.rb', line 41

def self.transfer
  unique_jobs = Export.where("active = 1").group("source_schema")

  unique_jobs.each do |export|
    download export
  end
end

Instance Method Details

#schedule(cron) ⇒ Object

Schedules the transport job in Resque



29
30
31
32
33
34
35
# File 'lib/transporter/transporter.rb', line 29

def schedule cron
  Resque.set_schedule("myreplicator_transporter", {
                        :cron => cron,
                        :class => "Myreplicator::Transporter",
                        :queue => "myreplicator_transporter"
                      })
end

#tmp_dirObject



12
13
14
15
16
# File 'lib/transporter/transporter.rb', line 12

def tmp_dir
  @tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  Dir.mkdir(@tmp_dir) unless File.directory?(@tmp_dir)
  @tmp_dir
end