Class: Sneaql::SneaqlStandard
- Inherits:
-
Object
- Object
- Sneaql::SneaqlStandard
- Defined in:
- lib/sneaql_standard.rb
Overview
top level class for interacting with sneaql standard
Instance Attribute Summary collapse
-
#params ⇒ Object
readonly
exposed for unit testing.
-
#q ⇒ Object
Returns the value of attribute q.
Instance Method Summary collapse
-
#build_transform_queue ⇒ Object
creates a threadsafe queue with all active transforms.
-
#configure_jdbc_driver ⇒ Object
configures the jdbc driver into the current context.
-
#create_connection ⇒ JDBCHelpers::ConnectionFactory.connection
creates a jdbc connection based upon current driver context.
-
#create_db_manager ⇒ Class
creates a database manager.
-
#create_db_objects(transform_table_name = nil) ⇒ Object
creates database objects.
-
#create_transforms_table(transform_table_name = nil) ⇒ Object
creates transform_table.
- #get_transforms ⇒ Object
-
#run ⇒ Object
runs all transforms.
-
#run_transforms ⇒ Object
perform concurrent transform run.
-
#set_params ⇒ Object
processes environment variables.
Instance Attribute Details
#params ⇒ Object (readonly)
exposed for unit testing
16 17 18 |
# File 'lib/sneaql_standard.rb', line 16 def params @params end |
#q ⇒ Object
Returns the value of attribute q.
17 18 19 |
# File 'lib/sneaql_standard.rb', line 17 def q @q end |
Instance Method Details
#build_transform_queue ⇒ Object
creates a threadsafe queue with all active transforms
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/sneaql_standard.rb', line 142 def build_transform_queue # creates a queue to hold all the transform parameter hashes @q = Queue.new transforms = get_transforms logger.info("#{transforms.length} transforms found in database...") # push transforms on to queue transforms.each do |t| tmp = {}.merge(@params) tmp[:transform_name] = t['transform_name'] # repo must be http or git https raise 'malformed transform definition' unless t['sql_repository'] =~ /^http.*/i tmp[:repo_url] = t['sql_repository'] # determine repo type based upon the the presence or absence of branch # this comes from sql which is why the casting and strip if t['sql_repository_branch'].to_s.strip == '' tmp[:repo_type] = 'http' else tmp[:repo_type] = 'git' tmp[:sql_repository_branch] = t['sql_repository_branch'] end tmp[:compression] = 'zip' if tmp[:repo_url] =~ /.*\.zip$/ # only step manager option tmp[:step_metadata_manager_type] = 'local_file' # must be sneaql.json in the base of the sneaql repo tmp[:step_metadata_file_path] = "#{@params[:repo_base_dir]}/#{tmp[:transform_name]}/sneaql.json" @q.push tmp end rescue => e logger.error(e.) e.backtrace.each { |b| logger.error(b) } end |
#configure_jdbc_driver ⇒ Object
configures the jdbc driver into the current context
246 247 248 249 250 |
# File 'lib/sneaql_standard.rb', line 246 def configure_jdbc_driver j = Sneaql::JDBCDriverHandler.new(@params) j.confirm_jdbc_driver j.require_jdbc_driver end |
#create_connection ⇒ JDBCHelpers::ConnectionFactory.connection
creates a jdbc connection based upon current driver context
227 228 229 230 231 232 233 234 |
# File 'lib/sneaql_standard.rb', line 227 def create_connection JDBCHelpers::ConnectionFactory.new( @params[:jdbc_url], @params[:db_user], @params[:db_pass], logger ).connection end |
#create_db_manager ⇒ Class
creates a database manager
238 239 240 241 242 243 |
# File 'lib/sneaql_standard.rb', line 238 def create_db_manager Sneaql::Core.find_class( :database, @params[:database] ).new end |
#create_db_objects(transform_table_name = nil) ⇒ Object
creates database objects
29 30 31 32 33 |
# File 'lib/sneaql_standard.rb', line 29 def create_db_objects(transform_table_name = nil) set_params configure_jdbc_driver create_transforms_table(transform_table_name) end |
#create_transforms_table(transform_table_name = nil) ⇒ Object
creates transform_table
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/sneaql_standard.rb', line 37 def create_transforms_table(transform_table_name = nil) transform_table_name = 'sneaql.transforms' unless transform_table_name connection = create_connection db_manager = Sneaql::Core.find_class( :database, @params[:database] ).new if transform_table_name =~ /\w+\.\w+/ # indicates schema qualfied object # make sure db supports schemas unless ['sqlite'].include?(@params[:database]) # create schema if needed JDBCHelpers::Execute.new( connection, "create schema if not exists #{transform_table_name.match(/^\w+/)};" ) end end creator = Sneaql::Standard::DBObjectCreator.new( connection, db_manager, logger ) creator.create_transforms_table(transform_table_name) ensure connection.close end |
#get_transforms ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/sneaql_standard.rb', line 183 def get_transforms # configure driver and db manager configure_jdbc_driver db_manager = Sneaql::Core.find_class( :database, @params[:database] ).new # connect and retrieve transform list connection = create_connection # fetch an array of active transforms transforms = JDBCHelpers::QueryResultsToArray.new( connection, %(select transform_name ,sql_repository ,sql_repository_branch from #{@params[:transform_table_name]} where is_active = #{db_manager.has_boolean ? 'true' : 1} order by transform_name;), logger ).results ensure connection.close return transforms end |
#run ⇒ Object
runs all transforms
20 21 22 23 24 25 |
# File 'lib/sneaql_standard.rb', line 20 def run set_params configure_jdbc_driver build_transform_queue run_transforms end |
#run_transforms ⇒ Object
perform concurrent transform run
215 216 217 218 219 220 221 222 |
# File 'lib/sneaql_standard.rb', line 215 def run_transforms # instantiate parallelize ParallelizeSneaqlTransforms.new( @q, @params[:concurrency], logger ) end |
#set_params ⇒ Object
processes environment variables
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/sneaql_standard.rb', line 71 def set_params @params = {} # each of these lil' hashes represents an env_var # that is required, as well as an optional # regex validation [ { var: 'SNEAQL_JDBC_URL', sym: :jdbc_url, validation: /^jdbc\:.+/i }, { var: 'SNEAQL_DB_USER', sym: :db_user }, { var: 'SNEAQL_DB_PASS', sym: :db_pass }, { var: 'SNEAQL_JDBC_DRIVER_JAR', sym: :jdbc_driver_jar, validation: /^(http\:\/\/.+|file\:\/\/.+|s3\:\/\/.+)/i }, { var: 'SNEAQL_JDBC_DRIVER_CLASS', sym: :jdbc_driver_class } ].each do |env_var| raise "required environment variable #{env_var[:var]} not provided" unless ENV[env_var[:var]] # assign the value of the env_var to the symbol key of @params @params[env_var[:sym]] = ENV[env_var[:var]] # validate if a validation is provided if env_var[:validation] unless @params[env_var[:sym]] =~ (env_var[:validation]) raise "required environment variable #{env_var[:var]} looks invalid" end end end # optional env vars are iterated in a similar manner # but instead of validation they have a default [ { var: 'SNEAQL_JDBC_DRIVER_JAR_MD5', sym: :jdbc_driver_jar_md5, default: nil }, { var: 'SNEAQL_METADATA_MANAGER_TYPE', sym: :step_metadata_manager_type, default: 'transform_steps_table' }, { var: 'SNEAQL_REPO_BASE_DIR', sym: :repo_base_dir, default: '/tmp/sneaql/repos' }, { var: 'SNEAQL_TRANSFORM_CONCURRENCY', sym: :concurrency, default: 1 }, { var: 'SNEAQL_TRANSFORM_TABLE_NAME', sym: :transform_table_name, default: 'sneaql.transforms' } ].each do |env_var| @params[env_var[:sym]] = ENV[env_var[:var]] ? ENV[env_var[:var]] : env_var[:default] end # numeric parameter provided by env var should be casted @params[:concurrency] = @params[:concurrency].to_i # determine database type based jdbc url # while technically any jdbc driver should work # with sneaql, the database type allows for better # handling of transactions, boolean, etc. @params[:database] = Sneaql::Core.database_type(@params[:jdbc_url]) rescue => e logger.error(e.) raise e end |