Class: Myreplicator::Loader

Inherits:
Object
  • Object
show all
Defined in:
lib/loader/loader.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Loader



8
9
10
# File 'lib/loader/loader.rb', line 8

def initialize *args
  options = args.extract_options!
end

Class Method Details

.cleanup(metadata) ⇒ Object

Deletes the metadata file and extract



216
217
218
219
220
# File 'lib/loader/loader.rb', line 216

def self.cleanup 
  puts "Cleaning up..."
  FileUtils.rm "#{.destination_filepath(tmp_dir)}.json" # json file
  FileUtils.rm .destination_filepath(tmp_dir) # dump file
end

.group_incrementals(incrementals) ⇒ Object

Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/loader/loader.rb', line 130

def self.group_incrementals incrementals
  groups = [] # array of all grouped incrementals

  incrementals.each do ||
    group = []
    incrementals.delete()

    # look for same loads
    incrementals.each do |md| 
      if .equals(md)
        group << md
        incrementals.delete(md) # remove from main array
      end
    end
    
    groups << group
  end
  return groups
end

.incremental_load(metadata) ⇒ Object

Loads data incrementally Uses the values specified in the metadatta object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/loader/loader.rb', line 174

def self.incremental_load 
  exp = Export.find(.export_id)
  Loader.unzip(.filename)
  .zipped = false
  
  options = {:table_name => exp.table_name, :db => exp.destination_schema,
    :filepath => .destination_filepath(tmp_dir)}
  
  if .export_type == "incremental_outfile"
    options[:fields_terminated_by] = ";~;"
    options[:lines_terminated_by] = "\\n"
  end
  
  cmd = ImportSql.load_data_infile(options)
  
  puts cmd
  
  result = `#{cmd}` # execute
  
  unless result.nil?
    if result.size > 0
      raise Exceptions::LoaderError.new("Incremental Load #{.filename} Failed!\n#{result}") 
    end
  end
end

.incremental_loads(incrementals) ⇒ Object

Load all incremental files Ensures that multiple loads to the same table happen sequentially.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/loader/loader.rb', line 99

def self.incremental_loads incrementals
  groups = Loader.group_incrementals incrementals
  procs = []
  groups.each do |group|
    procs << Proc.new {
      group.each do ||
        Log.run(:job_type => "loader", 
                :name => "incremental_import", 
                :file => .filename, 
                :export_id => .export_id) do |log|

          if Loader.transfer_completed? 
            Loader.incremental_load 
            Loader.cleanup 
          end

        end
      end # group
    }
  end # groups
  
  return procs
end

.initial_load(metadata) ⇒ Object

Creates table and loads data



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/loader/loader.rb', line 153

def self.initial_load 
  exp = Export.find(.export_id)
  Loader.unzip(.filename)
  .zipped = false
  
  cmd = ImportSql.initial_load(:db => exp.destination_schema,
                               :filepath => .destination_filepath(tmp_dir))      
  puts cmd
  
  result = `#{cmd}` # execute
  unless result.nil?
    if result.size > 0
      raise Exceptions::LoaderError.new("Initial Load #{.filename} Failed!\n#{result}") 
    end
  end
end

.initial_loads(initials) ⇒ Object

Loads all new tables concurrently multiple files



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/loader/loader.rb', line 72

def self.initial_loads initials
  procs = []

  initials.each do || 
    procs << Proc.new {
      Log.run(:job_type => "loader", 
              :name => "#{.export_type}_import", 
              :file => .filename, 
              :export_id => .export_id) do |log|

        if Loader.transfer_completed? 
          Loader.initial_load 
          Loader.cleanup 
        end

      end
    }
  end

  return procs
end

.loadObject

Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/loader/loader.rb', line 31

def self.load
  initials = []
  incrementals = []
  all_files = Loader.

  all_files.each do |m|
    if m.export_type == "initial"
      initials << m # Add initial to the list
      all_files.delete(m) # Delete obj from mixed list

      all_files.each do |md|
        if m.equals(md) && md.export_type == "incremental"
          initials << md # incremental should happen after the initial load
          all_files.delete(md) # remove from current list of files
        end
      end
    end
  end
  
  incrementals = all_files # Remaining are all incrementals
  
  initial_procs = Loader.initial_loads initials
  parallel_load initial_procs

  incremental_procs = Loader.incremental_loads incrementals
  parallel_load incremental_procs
end

.metadata_filesObject



247
248
249
250
251
252
253
# File 'lib/loader/loader.rb', line 247

def self.
  files = []
  Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file|
    files << ExportMetadata.new(:metadata_path => json_file)
  end
  return files
end

.parallel_load(procs) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/loader/loader.rb', line 59

def self.parallel_load procs
  p = Parallelizer.new(:klass => "Myreplicator::Loader")
  procs.each do |proc|
    p.queue << {:params => [], :block => proc}
  end
  
  p.run
end

.performObject

Main method provided for resque Reconnection provided for resque workers



20
21
22
23
24
# File 'lib/loader/loader.rb', line 20

def self.perform
  ActiveRecord::Base.verify_active_connections!
  ActiveRecord::Base.connection.reconnect!
  load # Kick off the load process
end

.tmp_dirObject



12
13
14
# File 'lib/loader/loader.rb', line 12

def self.tmp_dir
  @tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
end

.transfer_completed?(metadata) ⇒ Boolean

Returns true if the transfer of the file being loaded is completed



204
205
206
207
208
209
210
211
# File 'lib/loader/loader.rb', line 204

def self.transfer_completed? 
  if Log.completed?(:export_id => .export_id,
                    :file => .destination_filepath(tmp_dir),
                    :job_type => "transporter")
    return true
  end
  return false
end

.unzip(filename) ⇒ Object

Unzips file Checks if the file exists or already unzipped



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/loader/loader.rb', line 226

def self.unzip filename
  cmd = "cd #{tmp_dir}; gunzip #{filename}"
  passed = false
  if File.exist?(File.join(tmp_dir,filename))
    result = `#{cmd}`
    unless result.nil? 
      puts result
      unless result.length > 0
        passed = true
      end
    else
      passed = true
    end
  elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz","")))
    puts "File already unzipped"
    passed = true
  end

  raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed
end