Class: Myreplicator::Loader

Inherits:
Object
  • Object
show all
Defined in:
lib/loader/loader.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Loader

Returns a new instance of Loader.



9
10
11
# File 'lib/loader/loader.rb', line 9

def initialize *args
  options = args.extract_options!
end

Class Method Details

.cleanup(metadata) ⇒ Object

Deletes the metadata file and extract



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/loader/loader.rb', line 334

def self.cleanup 
  puts "Cleaning up..."
  e1 = nil
  e2 = nil
  begin
  FileUtils.rm .(tmp_dir) # json file
  rescue Exception => e
    e1 = e
    puts e.message
  end
  begin
  FileUtils.rm .destination_filepath(tmp_dir) # dump file
  rescue Exception => e
    e2 = e
    puts e.message
  end
  if (!e1.blank?)
    raise Exceptions::LoaderError.new("#{e1.message}")
  end
  if (!e2.blank?)
    raise Exceptions::LoaderError.new("#{e2.message}")
  end
end

.clear_older_files(metadata) ⇒ Object

Clears files that are older than the passed metadata file. Note: This methoded is provided to ensure no old incremental files ever get loaded after the schema change algorithm has been applied



404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/loader/loader.rb', line 404

def self.clear_older_files 
  files = Loader.
  #Kernel.p "===== clear old files ====="
  #Kernel.p metadata
  #Kernel.p files
  max_date = DateTime.strptime .export_time
  files.each do |m|
    if .export_id == m.export_id
      if max_date > DateTime.strptime(m.export_time)
        Loader.cleanup m if .filepath != m.filepath
      end 
    end
  end     
end

.group_incrementals(incrementals) ⇒ Object

Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/loader/loader.rb', line 210

def self.group_incrementals incrementals
  groups = [] # array of all grouped incrementals

  incrementals.each do ||
    group = []
    incrementals.delete()

    # look for same loads
    incrementals.each do |md| 
      if .equals(md)
        group << md
        incrementals.delete(md) # remove from main array
      end
    end
    
    groups << group
  end
  return groups
end

.incremental_load(metadata) ⇒ Object

Loads data incrementally Uses the values specified in the metadatta object



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/loader/loader.rb', line 268

def self.incremental_load 
  exp = Myreplicator::Export.find(.export_id)
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  
  options = {:table_name => exp.table_name, 
    :db => exp.destination_schema,
    :filepath => .destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema,      
    :fields_terminated_by => "\\0",
    :lines_terminated_by => "\\n"}
  
  case .export_to 
  when "vertica"
    Loader.load_to_vertica options, , exp
  when "mysql"
    cmd = ImportSql.load_data_infile(options)
    puts cmd
    result = `#{cmd}` # execute
    unless result.nil?
      if result.size > 0
        raise Exceptions::LoaderError.new("Incremental Load #{.filename} Failed!\n#{result}") 
      end
    end
  end #case  
end

.incremental_loads(incrementals) ⇒ Object

Load all incremental files Ensures that multiple loads to the same table happen sequentially.



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/loader/loader.rb', line 179

def self.incremental_loads incrementals
  groups = Loader.group_incrementals incrementals
  procs = []
  groups.each do |group|
    procs << Proc.new {
      group.each do ||
        Myreplicator::Log.run(:job_type => "loader", 
                :name => "incremental_import", 
                :file => .filename, 
                :export_id => .export_id) do |log|

          if Myreplicator::Loader.transfer_completed?             
            Myreplicator::Loader.incremental_load 
            Myreplicator::Loader.cleanup 
          end

        end
      end # group
    }
  end # groups
  
  return procs
end

.initial_load(metadata) ⇒ Object

Creates table and loads data



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/loader/loader.rb', line 233

def self.initial_load 
  exp = Myreplicator::Export.find(.export_id)
  #Kernel.p "===== unzip ====="
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  filename = .filename
  if filename.split('.').last == 'gz'
    filepath = .destination_filepath(tmp_dir)
    cmd = "gunzip #{filepath}"
    system(cmd)
    unzip_file = File.join(tmp_dir, filename.split('.')[0..-2].join('.'))
    cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema,
                                     :filepath => unzip_file.to_s)
    puts cmd
    result = `#{cmd} 2>&1` # execute
    cmd2 = "gzip #{unzip_file.to_s}"
    system(cmd2)
  else
    cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema,
                                     :filepath => .destination_filepath(tmp_dir))
    puts cmd
    result = `#{cmd} 2>&1` # execute
  end
  
  unless result.nil?
    if result.size > 0
      raise Exceptions::LoaderError.new("Initial Load #{.filename} Failed!\n#{result}") 
    end
  end
end

.initial_loads(initials) ⇒ Object

Loads all new tables concurrently multiple files



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/loader/loader.rb', line 148

def self.initial_loads initials
  procs = []

  initials.each do || 
    procs << Proc.new {
      Myreplicator::Log.run(:job_type => "loader", 
              :name => "#{.export_type}_import", 
              :file => .filename, 
              :export_id => .export_id) do |log|

        if Myreplicator::Loader.transfer_completed? 
          if .export_to == "vertica"
            Myreplicator::Loader.incremental_load 
          else
            Myreplicator::Loader.initial_load 
          end
          Myreplicator::Loader.cleanup 
        end

      end
    }
  end

  return procs
end

.loadObject

Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/loader/loader.rb', line 50

def self.load
  initials = []
  incrementals = []
  all_files = Myreplicator::Loader.
  
   = {}
  @redis = Redis.new(:host => Settings[:redis][:host], :port => Settings[:redis][:port])
  @load_set = "myreplicator_load_set"
  @load_hash = "myreplicator_load_hash"
  
  # clear out |k,v| of already deleted filed 
  @redis.hgetall(@load_hash).size
  @redis.hgetall(@load_hash).each do |k, v|
    if @redis.hget(@load_hash, k) == '1'
      @redis.hdel(@load_hash, k)
    end
  end
  
  # check if there is any other running loader, if not then reset the load_hash
  cmd = "ps aux | grep 'Processing myreplicator_load'"
  result = `#{cmd} 2>&1`
  if (result.split("Processing myreplicator_load since").size <= 2)
    @redis.hgetall(@load_hash).each do |k, v|
      @redis.hdel(@load_hash, k)
    end
  end
  
  # making the hash for mapping filepath to metadata object
  all_files.each do |m|
    if Myreplicator::Loader.transfer_completed? m
      if !(@redis.hexists(@load_hash, m.filepath))
        @redis.hset(@load_hash, m.filepath, 0)
        @redis.sadd(@load_set, m.filepath)
      end
      [m.filepath] = m
    else
      # for the fun of commenting: do nothing
    end
  end
  
  # processing the files in "queue"
  while @redis.smembers(@load_set).size > 0
    filepath = @redis.spop(@load_set)
          
     = [filepath]
    if .blank?
      next
    end
    # init load
    if .export_type == "initial"
      if Myreplicator::Loader.transfer_completed? 
        Myreplicator::Log.run(:job_type => "loader",
        :name => "#{.export_type}_import",
        :file => .filename,
        :export_id => .export_id) do |log|
          if .export_to == "vertica"
            Myreplicator::Loader.incremental_load 
          else
            Myreplicator::Loader.initial_load 
          end
          Myreplicator::Loader.cleanup 
        end
        @redis.hset(@load_hash, .filepath, 1)
      else #transporter not done yet, return the file to @load_set 
        @redis.sadd(@load_set, .filepath)
      end
      
    else #if metadata.export_type == "incremental" # incremental load
      if Myreplicator::Loader.transfer_completed? 
        Myreplicator::Log.run(:job_type => "loader",
        :name => "incremental_import",
        :file => .filename,
        :export_id => .export_id) do |log|
          Myreplicator::Loader.incremental_load 
          Myreplicator::Loader.cleanup 
        end
        @redis.hset(@load_hash, .filepath, 1)
      else #transporter not done yet, return the file to @load_set
        @redis.sadd(@load_set, .filepath)
      end
    end
    sleep(2)
  end # end while        
end

.load_to_vertica(options, metadata, exp) ⇒ Object

Load to Vertica



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/loader/loader.rb', line 298

def self.load_to_vertica options, , exp
  options = {:table_name => exp.table_name, 
    :db => ActiveRecord::Base.configurations["vertica"]["database"],
    :filepath => .destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema, :export_id => .export_id,
    :metadata => 
  }
  
  options[:destination_schema] = exp.destination_schema
  
  result = Myreplicator::VerticaLoader.load options
  
  ##TO DO: Handle unsuccessful vertica loads here

end

.metadata_filesObject



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/loader/loader.rb', line 383

def self.
  files = []
  Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file|
    files << Myreplicator::ExportMetadata.new(:metadata_path => json_file)
  end
  result = []
  #Kernel.p files
  files.each do |file|
    job = Export.where("id = #{file.export_id}").first
    #if job.state == "transport_completed"
    result << file
    #end
  end
  return result
end

.mysql_table_definition(options) ⇒ Object



419
420
421
422
423
424
425
426
427
428
429
# File 'lib/loader/loader.rb', line 419

def self.mysql_table_definition options
  sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key "
  sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{options[:table]}' "
  sql += "and table_schema = '#{options[:source_schema]}';"
  
  puts sql
  
  desc = Myreplicator::DB.exec_sql(options[:source_schema], sql)
  puts desc
  return desc
end

.parallel_load(procs) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/loader/loader.rb', line 135

def self.parallel_load procs
  p = Parallelizer.new(:klass => "Myreplicator::Loader")
  procs.each do |proc|
    p.queue << {:params => [], :block => proc}
  end
  
  p.run
end

.perform(*args) ⇒ Object

Main method provided for resque Reconnection provided for resque workers



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/loader/loader.rb', line 22

def self.perform *args
  options = args.extract_options!
  id = options[:id]
  if id.blank?  
    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!
    load # Kick off the load process
  else
    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!
    load_id(id)
  end
end

.tmp_dirObject



13
14
15
16
# File 'lib/loader/loader.rb', line 13

def self.tmp_dir
  #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  @tmp_dir ||= Myreplicator.tmp_path
end

.transfer_completed?(metadata) ⇒ Boolean

Returns true if the transfer of the file being loaded is completed

Returns:

  • (Boolean)


318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/loader/loader.rb', line 318

def self.transfer_completed? 
  #Kernel.p "===== transfer_completed? metadata ====="
  #Kernel.p ({:export_id => metadata.export_id,
  #                        :file => metadata.export_path,
  #:job_type => "transporter"})
  if Log.completed?(:export_id => .export_id,
                    :file => .export_path,
                    :job_type => "transporter")
    return true
  end
  return false
end

.unzip(filename) ⇒ Object

Unzips file Checks if the file exists or already unzipped



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/loader/loader.rb', line 362

def self.unzip filename
  cmd = "cd #{tmp_dir}; gunzip #{filename}"
  passed = false
  if File.exist?(File.join(tmp_dir,filename))
    result = `#{cmd}`
    unless result.nil? 
      puts result
      unless result.length > 0
        passed = true
      end
    else
      passed = true
    end
  elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz","")))
    puts "File already unzipped"
    passed = true
  end

  raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed
end

Instance Method Details

#load_id(id) ⇒ Object

Running loader for 1 export object



39
40
41
42
43
# File 'lib/loader/loader.rb', line 39

def load_id id
  
  #Resque.enqueue(Myreplicator::Loader, id)
  #Resque.enqueue(Myreplicator::Export,342)
end