Class: Myreplicator::Loader
- Inherits:
-
Object
- Object
- Myreplicator::Loader
- Defined in:
- lib/loader/loader.rb
Class Method Summary collapse
-
.cleanup(metadata) ⇒ Object
Deletes the metadata file and extract.
-
.group_incrementals(incrementals) ⇒ Object
Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision.
-
.incremental_load(metadata) ⇒ Object
Loads data incrementally Uses the values specified in the metadatta object.
-
.incremental_loads(incrementals) ⇒ Object
Load all incremental files Ensures that multiple loads to the same table happen sequentially.
-
.initial_load(metadata) ⇒ Object
Creates table and loads data.
-
.initial_loads(initials) ⇒ Object
Loads all new tables concurrently multiple files.
-
.load ⇒ Object
Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially.
- .metadata_files ⇒ Object
- .parallel_load(procs) ⇒ Object
-
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers.
- .tmp_dir ⇒ Object
-
.transfer_completed?(metadata) ⇒ Boolean
Returns true if the transfer of the file being loaded is completed.
-
.unzip(filename) ⇒ Object
Unzips file Checks if the file exists or already unzipped.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Loader
constructor
A new instance of Loader.
Constructor Details
#initialize(*args) ⇒ Loader
8 9 10 |
# File 'lib/loader/loader.rb', line 8 def initialize *args = args. end |
Class Method Details
.cleanup(metadata) ⇒ Object
Deletes the metadata file and extract
216 217 218 219 220 |
# File 'lib/loader/loader.rb', line 216 def self.cleanup puts "Cleaning up..." FileUtils.rm "#{.destination_filepath(tmp_dir)}.json" # json file FileUtils.rm .destination_filepath(tmp_dir) # dump file end |
.group_incrementals(incrementals) ⇒ Object
Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/loader/loader.rb', line 130 def self.group_incrementals incrementals groups = [] # array of all grouped incrementals incrementals.each do || group = [] incrementals.delete() # look for same loads incrementals.each do |md| if .equals(md) group << md incrementals.delete(md) # remove from main array end end groups << group end return groups end |
.incremental_load(metadata) ⇒ Object
Loads data incrementally Uses the values specified in the metadatta object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/loader/loader.rb', line 174 def self.incremental_load exp = Export.find(.export_id) Loader.unzip(.filename) .zipped = false = {:table_name => exp.table_name, :db => exp.destination_schema, :filepath => .destination_filepath(tmp_dir)} if .export_type == "incremental_outfile" [:fields_terminated_by] = ";~;" [:lines_terminated_by] = "\\n" end cmd = ImportSql.load_data_infile() puts cmd result = `#{cmd}` # execute unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Incremental Load #{.filename} Failed!\n#{result}") end end end |
.incremental_loads(incrementals) ⇒ Object
Load all incremental files Ensures that multiple loads to the same table happen sequentially.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/loader/loader.rb', line 99 def self.incremental_loads incrementals groups = Loader.group_incrementals incrementals procs = [] groups.each do |group| procs << Proc.new { group.each do || Log.run(:job_type => "loader", :name => "incremental_import", :file => .filename, :export_id => .export_id) do |log| if Loader.transfer_completed? Loader.incremental_load Loader.cleanup end end end # group } end # groups return procs end |
.initial_load(metadata) ⇒ Object
Creates table and loads data
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/loader/loader.rb', line 153 def self.initial_load exp = Export.find(.export_id) Loader.unzip(.filename) .zipped = false cmd = ImportSql.initial_load(:db => exp.destination_schema, :filepath => .destination_filepath(tmp_dir)) puts cmd result = `#{cmd}` # execute unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Initial Load #{.filename} Failed!\n#{result}") end end end |
.initial_loads(initials) ⇒ Object
Loads all new tables concurrently multiple files
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/loader/loader.rb', line 72 def self.initial_loads initials procs = [] initials.each do || procs << Proc.new { Log.run(:job_type => "loader", :name => "#{.export_type}_import", :file => .filename, :export_id => .export_id) do |log| if Loader.transfer_completed? Loader.initial_load Loader.cleanup end end } end return procs end |
.load ⇒ Object
Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/loader/loader.rb', line 31 def self.load initials = [] incrementals = [] all_files = Loader. all_files.each do |m| if m.export_type == "initial" initials << m # Add initial to the list all_files.delete(m) # Delete obj from mixed list all_files.each do |md| if m.equals(md) && md.export_type == "incremental" initials << md # incremental should happen after the initial load all_files.delete(md) # remove from current list of files end end end end incrementals = all_files # Remaining are all incrementals initial_procs = Loader.initial_loads initials parallel_load initial_procs incremental_procs = Loader.incremental_loads incrementals parallel_load incremental_procs end |
.metadata_files ⇒ Object
247 248 249 250 251 252 253 |
# File 'lib/loader/loader.rb', line 247 def self. files = [] Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file| files << ExportMetadata.new(:metadata_path => json_file) end return files end |
.parallel_load(procs) ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/loader/loader.rb', line 59 def self.parallel_load procs p = Parallelizer.new(:klass => "Myreplicator::Loader") procs.each do |proc| p.queue << {:params => [], :block => proc} end p.run end |
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers
20 21 22 23 24 |
# File 'lib/loader/loader.rb', line 20 def self.perform ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! load # Kick off the load process end |
.tmp_dir ⇒ Object
12 13 14 |
# File 'lib/loader/loader.rb', line 12 def self.tmp_dir @tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator") end |
.transfer_completed?(metadata) ⇒ Boolean
Returns true if the transfer of the file being loaded is completed
204 205 206 207 208 209 210 211 |
# File 'lib/loader/loader.rb', line 204 def self.transfer_completed? if Log.completed?(:export_id => .export_id, :file => .destination_filepath(tmp_dir), :job_type => "transporter") return true end return false end |
.unzip(filename) ⇒ Object
Unzips file Checks if the file exists or already unzipped
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/loader/loader.rb', line 226 def self.unzip filename cmd = "cd #{tmp_dir}; gunzip #{filename}" passed = false if File.exist?(File.join(tmp_dir,filename)) result = `#{cmd}` unless result.nil? puts result unless result.length > 0 passed = true end else passed = true end elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz",""))) puts "File already unzipped" passed = true end raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed end |