Class: Sneaql::SneaqlStandard

Inherits:
Object
  • Object
show all
Defined in:
lib/sneaql_standard.rb

Overview

top level class for interacting with sneaql standard

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#paramsObject (readonly)

exposed for unit testing



16
17
18
# File 'lib/sneaql_standard.rb', line 16

def params
  @params
end

#qObject

Returns the value of attribute q.



17
18
19
# File 'lib/sneaql_standard.rb', line 17

def q
  @q
end

Instance Method Details

#build_transform_queueObject

creates a threadsafe queue with all active transforms



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/sneaql_standard.rb', line 142

def build_transform_queue
  # creates a queue to hold all the transform parameter hashes
  @q = Queue.new

  transforms = get_transforms
  logger.info("#{transforms.length} transforms found in database...")

  # push transforms on to queue
  transforms.each do |t|
    tmp = {}.merge(@params)
    tmp[:transform_name] = t['transform_name']

    # repo must be http or git https
    raise 'malformed transform definition' unless t['sql_repository'] =~ /^http.*/i

    tmp[:repo_url] = t['sql_repository']

    # determine repo type based upon the the presence or absence of branch
    # this comes from sql which is why the casting and strip
    if t['sql_repository_branch'].to_s.strip == ''
      tmp[:repo_type] = 'http'
    else
      tmp[:repo_type] = 'git'
      tmp[:sql_repository_branch] = t['sql_repository_branch']
    end

    tmp[:compression] = 'zip' if tmp[:repo_url] =~ /.*\.zip$/

    # only step manager option
    tmp[:step_metadata_manager_type] = 'local_file'

    # must be sneaql.json in the base of the sneaql repo
    tmp[:step_metadata_file_path] = "#{@params[:repo_base_dir]}/#{tmp[:transform_name]}/sneaql.json"

    @q.push tmp
  end
rescue => e
  logger.error(e.message)
  e.backtrace.each { |b| logger.error(b) }
end

#configure_jdbc_driverObject

configures the jdbc driver into the current context



246
247
248
249
250
# File 'lib/sneaql_standard.rb', line 246

def configure_jdbc_driver
  j = Sneaql::JDBCDriverHandler.new(@params)
  j.confirm_jdbc_driver
  j.require_jdbc_driver
end

#create_connectionJDBCHelpers::ConnectionFactory.connection

creates a jdbc connection based upon current driver context

Returns:

  • (JDBCHelpers::ConnectionFactory.connection)


227
228
229
230
231
232
233
234
# File 'lib/sneaql_standard.rb', line 227

def create_connection
  JDBCHelpers::ConnectionFactory.new(
    @params[:jdbc_url],
    @params[:db_user],
    @params[:db_pass],
    logger
  ).connection
end

#create_db_managerClass

creates a database manager

Returns:

  • (Class)


238
239
240
241
242
243
# File 'lib/sneaql_standard.rb', line 238

def create_db_manager
  Sneaql::Core.find_class(
    :database,
    @params[:database]
  ).new
end

#create_db_objects(transform_table_name = nil) ⇒ Object

creates database objects

Parameters:

  • transform_table_name (String) (defaults to: nil)

    if provided will override sneaql.transforms



29
30
31
32
33
# File 'lib/sneaql_standard.rb', line 29

def create_db_objects(transform_table_name = nil)
  set_params
  configure_jdbc_driver
  create_transforms_table(transform_table_name)
end

#create_transforms_table(transform_table_name = nil) ⇒ Object

creates transform_table

Parameters:

  • transform_table_name (String) (defaults to: nil)

    if provided will override sneaql.transforms



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/sneaql_standard.rb', line 37

def create_transforms_table(transform_table_name = nil)
  transform_table_name = 'sneaql.transforms' unless transform_table_name

  connection = create_connection

  db_manager = Sneaql::Core.find_class(
    :database,
    @params[:database]
  ).new

  if transform_table_name =~ /\w+\.\w+/
    # indicates schema qualfied object
    # make sure db supports schemas
    unless ['sqlite'].include?(@params[:database])
      # create schema if needed
      JDBCHelpers::Execute.new(
        connection,
        "create schema if not exists #{transform_table_name.match(/^\w+/)};"
      )
    end
  end

  creator = Sneaql::Standard::DBObjectCreator.new(
    connection,
    db_manager,
    logger
  )
  creator.create_transforms_table(transform_table_name)

ensure
  connection.close
end

#get_transformsObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/sneaql_standard.rb', line 183

def get_transforms
  # configure driver and db manager
  configure_jdbc_driver
  db_manager = Sneaql::Core.find_class(
    :database,
    @params[:database]
  ).new

  # connect and retrieve transform list
  connection = create_connection

  # fetch an array of active transforms
  transforms = JDBCHelpers::QueryResultsToArray.new(
    connection,
    %(select
        transform_name
        ,sql_repository
        ,sql_repository_branch
      from
        #{@params[:transform_table_name]}
      where
        is_active = #{db_manager.has_boolean ? 'true' : 1}
      order by
        transform_name;),
    logger
  ).results
ensure
  connection.close
  return transforms
end

#runObject

runs all transforms



20
21
22
23
24
25
# File 'lib/sneaql_standard.rb', line 20

def run
  set_params
  configure_jdbc_driver
  build_transform_queue
  run_transforms
end

#run_transformsObject

perform concurrent transform run



215
216
217
218
219
220
221
222
# File 'lib/sneaql_standard.rb', line 215

def run_transforms
  # instantiate parallelize
  ParallelizeSneaqlTransforms.new(
    @q,
    @params[:concurrency],
    logger
  )
end

#set_paramsObject

processes environment variables



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/sneaql_standard.rb', line 71

def set_params
  @params = {}

  # each of these lil' hashes represents an env_var
  # that is required, as well as an optional
  # regex validation
  [
    { var: 'SNEAQL_JDBC_URL', sym: :jdbc_url, validation: /^jdbc\:.+/i },
    { var: 'SNEAQL_DB_USER', sym: :db_user },
    { var: 'SNEAQL_DB_PASS', sym: :db_pass },
    { var: 'SNEAQL_JDBC_DRIVER_JAR', sym: :jdbc_driver_jar, validation: /^(http\:\/\/.+|file\:\/\/.+|s3\:\/\/.+)/i },
    { var: 'SNEAQL_JDBC_DRIVER_CLASS', sym: :jdbc_driver_class }
  ].each do |env_var|
    raise "required environment variable #{env_var[:var]} not provided" unless ENV[env_var[:var]]
    # assign the value of the env_var to the symbol key of @params
    @params[env_var[:sym]] = ENV[env_var[:var]]

    # validate if a validation is provided
    if env_var[:validation]
      unless @params[env_var[:sym]] =~ (env_var[:validation])
        raise "required environment variable #{env_var[:var]} looks invalid"
      end
    end
  end

  # optional env vars are iterated in a similar manner
  # but instead of validation they have a default
  [
    {
      var: 'SNEAQL_JDBC_DRIVER_JAR_MD5',
      sym: :jdbc_driver_jar_md5,
      default: nil
    },
    {
      var: 'SNEAQL_METADATA_MANAGER_TYPE',
      sym: :step_metadata_manager_type,
      default: 'transform_steps_table'
    },
    {
      var: 'SNEAQL_REPO_BASE_DIR',
      sym: :repo_base_dir,
      default: '/tmp/sneaql/repos'
    },
    {
      var: 'SNEAQL_TRANSFORM_CONCURRENCY',
      sym: :concurrency,
      default: 1
    },
    {
      var: 'SNEAQL_TRANSFORM_TABLE_NAME',
      sym: :transform_table_name,
      default: 'sneaql.transforms'
    }
  ].each do |env_var|
    @params[env_var[:sym]] = ENV[env_var[:var]] ? ENV[env_var[:var]] : env_var[:default]
  end

  # numeric parameter provided by env var should be casted
  @params[:concurrency] = @params[:concurrency].to_i

  # determine database type based jdbc url
  # while technically any jdbc driver should work
  # with sneaql, the database type allows for better
  # handling of transactions, boolean, etc.
  @params[:database] = Sneaql::Core.database_type(@params[:jdbc_url])
rescue => e
  logger.error(e.message)
  raise e
end