Module: InfobrightLoader::Loader
- Defined in:
- lib/infobright-loader/loader.rb
Defined Under Namespace
Classes: LoadError
Class Method Summary collapse
-
.load_from_folder(folder, table, db, separator = '|', encloser = '') ⇒ Object
Load a single table in Infobright with the contents of a single folder.
-
.load_from_hash(load_hash, db, processes = 10, separator = '|', encloser = '') ⇒ Object
Load Infobright using a hash of tables to filenames.
-
.load_parallel(load_hash, db, processes, separator, encloser) ⇒ Object
Perform a parallel load.
-
.load_serial(load_hash, db, separator, encloser) ⇒ Object
Perform a serial load Only used for debugging.
-
.load_table(files, table, db, separator, encloser) ⇒ Object
Load a single table.
Class Method Details
.load_from_folder(folder, table, db, separator = '|', encloser = '') ⇒ Object
Load a single table in Infobright with the contents of a single folder
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/infobright-loader/loader.rb', line 28 def load_from_folder(folder, table, db, separator='|', encloser='') # Let's loop through and grab all absolute paths to all the files in this folder, recursively load_hash = {} load_hash[table] = Dir["#{folder}**/*"].find_all{|f| File.file?(f)}.map{|f| File.(f)} # Check we have some files to load unless load_hash[table].any? raise LoadError, "No files to load in folder #{folder}" end # Now we have converted the folder and table # into a map, we can use load_from_map() load_from_hash(load_hash, db, 1, separator, encloser) end |
.load_from_hash(load_hash, db, processes = 10, separator = '|', encloser = '') ⇒ Object
Load Infobright using a hash of tables to filenames.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/infobright-loader/loader.rb', line 47 def load_from_hash(load_hash, db, processes=10, separator='|', encloser='') # Check we have some tables t_count = load_hash.length # Some validation about the load we're going to do case when t_count == 0 raise LoadError, "We have no tables to populate" when t_count < processes puts "We have only #{t_count} table(s) to populate, reducing processes from #{processes} to #{t_count}" # TODO: move to Ruby logger? processes = t_count end # Now let's check MySQL server is accessible unless InfobrightLoader::Db.running?(db) raise LoadError, "Default MySQL server cannot be found or is not running" end # Now let's check that we can access the database unless InfobrightLoader::Db.db_exists?(db) raise LoadError, "Database #{db.name} cannot be found or user lacks sufficient privileges" end # Now we're ready to start with the load - either parallel or serial if t_count == 1 table, files = load_hash.first failures = load_table(files, table, db, separator, encloser) else failures = load_parallel(load_hash, db, processes, separator, encloser) # failures = load_serial(load_hash, db, separator, encloser) # For debugging without worrying about threads. end failures # Return failures end |
.load_parallel(load_hash, db, processes, separator, encloser) ⇒ Object
Perform a parallel load
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/infobright-loader/loader.rb', line 121 def load_parallel(load_hash, db, processes, separator, encloser) tables_to_load = load_hash.keys table = nil threads = [] files_not_loaded = [] complete = false mutex = Mutex.new # If an exception is thrown in a thread that isn't handled, die quickly Thread.abort_on_exception = true # Create Ruby threads to concurrently execute Infobright loads for i in (0...processes) # Each thread pops a table off the tables_to_load array, and loads files into it. # We loop until there are no more tables to populate. threads << Thread.new do loop do # Critical section # Only allow one thread to modify the array at any time mutex.synchronize do if tables_to_load.length == 0 complete = true end table = tables_to_load.pop end # Let's quit if we have no table to load break if complete # Exit the thread # Otherwise let's run through and do all the loads for this table failures = load_table(load_hash[table], table, db, separator, encloser) # Also critical: only one thread should update the failures # list at a time mutex.synchronize do unless failures.empty? files_not_loaded.concat failures end end end end end # Wait for threads to finish threads.each { |aThread| aThread.join } files_not_loaded end |
.load_serial(load_hash, db, separator, encloser) ⇒ Object
Perform a serial load Only used for debugging
106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/infobright-loader/loader.rb', line 106 def load_serial(load_hash, db, separator, encloser) files_not_loaded = [] load_hash.keys.each { |k| failures = load_table(load_hash[k], k, db, separator, encloser) unless failures.empty? files_not_loaded.concat failures end } files_not_loaded end |
.load_table(files, table, db, separator, encloser) ⇒ Object
Load a single table
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/infobright-loader/loader.rb', line 87 def load_table(files, table, db, separator, encloser) failures = [] # Tables we didn't manage to load files.each { |f| puts "Loading file #{f} into table #{db.name}.#{table}" # TODO: move to Ruby logger? begin InfobrightLoader::Db.load_file(f, table, db, separator, encloser) rescue LoadError => le puts "LOAD ERROR: %s" % le # TODO: move to Ruby logger? failures << "%s (%s)" % [f, le] end } failures end |