Module: InfobrightLoader::Loader

Defined in:
lib/infobright-loader/loader.rb

Defined Under Namespace

Classes: LoadError

Class Method Summary collapse

Class Method Details

.load_from_folder(folder, table, db, separator = '|', encloser = '') ⇒ Object

Load a single table in Infobright with the contents of a single folder



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/infobright-loader/loader.rb', line 28

def load_from_folder(folder, table, db, separator='|', encloser='')

  # Let's loop through and grab all absolute paths to all the files in this folder, recursively
  load_hash = {}
  load_hash[table] = Dir["#{folder}**/*"].find_all{|f| File.file?(f)}.map{|f| File.expand_path(f)}

  # Check we have some files to load
  unless load_hash[table].any?
    raise LoadError, "No files to load in folder #{folder}"
  end

  # Now we have converted the folder and table
  # into a map, we can use load_from_map()
  load_from_hash(load_hash, db, 1, separator, encloser)
end

.load_from_hash(load_hash, db, processes = 10, separator = '|', encloser = '') ⇒ Object

Load Infobright using a hash of tables to filenames.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/infobright-loader/loader.rb', line 47

def load_from_hash(load_hash, db, processes=10, separator='|', encloser='')

  # Check we have some tables
  t_count = load_hash.length

  # Some validation about the load we're going to do
  case 
  when t_count == 0
    raise LoadError, "We have no tables to populate"
  when t_count < processes
    puts "We have only #{t_count} table(s) to populate, reducing processes from #{processes} to #{t_count}" # TODO: move to Ruby logger?
    processes = t_count
  end

  # Now let's check MySQL server is accessible
  unless InfobrightLoader::Db.running?(db)
    raise LoadError, "Default MySQL server cannot be found or is not running"
  end

  # Now let's check that we can access the database
  unless InfobrightLoader::Db.db_exists?(db)
    raise LoadError, "Database #{db.name} cannot be found or user lacks sufficient privileges"
  end      

  # Now we're ready to start with the load - either parallel or serial
  if t_count == 1
    table, files = load_hash.first
    failures = load_table(files, table, db, separator, encloser)
  else
    failures = load_parallel(load_hash, db, processes, separator, encloser)
    # failures = load_serial(load_hash, db, separator, encloser) # For debugging without worrying about threads.
  end

  failures # Return failures
end

.load_parallel(load_hash, db, processes, separator, encloser) ⇒ Object

Perform a parallel load



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/infobright-loader/loader.rb', line 121

def load_parallel(load_hash, db, processes, separator, encloser)

  tables_to_load = load_hash.keys
  table = nil
  threads = []
  files_not_loaded = []
  complete = false
  mutex = Mutex.new

  # If an exception is thrown in a thread that isn't handled, die quickly
  Thread.abort_on_exception = true

  # Create Ruby threads to concurrently execute Infobright loads
  for i in (0...processes)
    
    # Each thread pops a table off the tables_to_load array, and loads files into it.
    # We loop until there are no more tables to populate.
    threads << Thread.new do
      loop do

        # Critical section
        # Only allow one thread to modify the array at any time
        mutex.synchronize do
          if tables_to_load.length == 0
            complete = true
          end
          table = tables_to_load.pop
        end

        # Let's quit if we have no table to load
        break if complete # Exit the thread

        # Otherwise let's run through and do all the loads for this table
        failures = load_table(load_hash[table], table, db, separator, encloser)

        # Also critical: only one thread should update the failures
        # list at a time
        mutex.synchronize do
          unless failures.empty?
            files_not_loaded.concat failures
          end
        end

      end
    end
  end

  # Wait for threads to finish
  threads.each { |aThread|  aThread.join }
  files_not_loaded
end

.load_serial(load_hash, db, separator, encloser) ⇒ Object

Perform a serial load Only used for debugging



106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/infobright-loader/loader.rb', line 106

def load_serial(load_hash, db, separator, encloser)

  files_not_loaded = []

  load_hash.keys.each { |k|
    failures = load_table(load_hash[k], k, db, separator, encloser)
    unless failures.empty?
      files_not_loaded.concat failures
    end
  }
  files_not_loaded
end

.load_table(files, table, db, separator, encloser) ⇒ Object

Load a single table



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/infobright-loader/loader.rb', line 87

def load_table(files, table, db, separator, encloser)

  failures = [] # Tables we didn't manage to load

  files.each { |f|
    puts "Loading file #{f} into table #{db.name}.#{table}" # TODO: move to Ruby logger?
    begin
      InfobrightLoader::Db.load_file(f, table, db, separator, encloser)
    rescue LoadError => le
      puts "LOAD ERROR: %s" % le # TODO: move to Ruby logger?
      failures << "%s (%s)" % [f, le]
    end
  }
  failures
end