Top Level Namespace

Defined Under Namespace

Modules: DataPipe2

Instance Method Summary collapse

Instance Method Details

#csv_to_db(file_name, table_name) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/fns/csv_to_db.rb', line 3

def csv_to_db(file_name, table_name)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)
  db.execute("TRUNCATE TABLE #{table_name}", [])

  csv = CSV.read(file_name)
  columns = csv.shift
  sql = "INSERT INTO #{table_name} ( #{columns.join(',')} )
          VALUES (#{Array.new(columns.length, '?').join(',')})"
  csv.each do |row|
    db.execute(sql, row)
  end
  DataPipe2.log "#{table_name}: #{csv.length}", true
end

#db_to_csv(sql, path, name) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/fns/db_to_csv.rb', line 3

def db_to_csv(sql, path, name)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)

  file_path = "#{path}/#{name}"
  File.delete(file_path) if File.exist?(file_path)

  rst = db.queryForResultset(sql, [])

  CSV.open(file_path, 'w') do |csv|
    csv << rst[0].keys
    rst.each { |r| csv << r.values }
  end
end

#db_to_json(sql, path, name) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
# File 'lib/fns/db_to_json.rb', line 4

def db_to_json(sql, path, name)
  # Dir.mkdir(path) unless Dir.exists?(path)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)

  file_path = "#{path}/#{name}"
  File.delete(file_path) if File.exist?(file_path)

  rst = db.queryForResultset(sql, [])
  File.write(file_path, rst.to_json)
end

#mssql_to_pgsql(select_sql, table_name, columns) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fns/mssql_to_pgsql.rb', line 2

def mssql_to_pgsql(select_sql, table_name, columns)
  s = FluidDb::Db(ENV['MSSQL']) # source
  d = FluidDb::Db(ENV['PGSQL']) # destination

  d.connection.exec("TRUNCATE TABLE #{table_name}")
  copy_cmd = %{
    COPY #{table_name} (#{columns})
    FROM STDIN
    WITH DELIMITER AS '|'
    CSV;
  }
  d.connection.exec(copy_cmd)

  results = s.connection.execute(select_sql)

  count = 0
  results.each(as: :array, cache_rows: false) do |r|
    count += 1
    print '.' if count % 100_000 == 0
    d.connection.put_copy_data "#{r.join('|')}\n"
  end
  puts ""
  d.connection.put_copy_end

  DataPipe2.log "#{table_name}: #{count}"
end

#smb_to_local(smb_uri_string, local_folder) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/fns/smb_to_local.rb', line 5

def smb_to_local(smb_uri_string, local_folder)
  # encode the string
  uri = URI.parse(URI.escape(smb_uri_string))
  domain = nil
  unless uri.query.nil?
    params = CGI.parse(uri.query)
    domain = params['domain'][0]
  end
  p "#{domain}, #{uri.host}, #{uri.user}, #{uri.password}"
  client = Sambal::Client.new(domain: domain, host: uri.host,
                              share: 'Data', user: uri.user, password: uri.password)
  client.ls(remote_dirpath).each do |remote_filepath|
    local_filepath = local_folder
    client.get(remote_filepath, local_filepath) # downloads file from server
    client.del(remote_filepath) # deletes files from server
  end
  client.close # closes connection
end