Top Level Namespace

Defined Under Namespace

Modules: DataPipe, Telemetry Classes: StreamBuilder

Instance Method Summary collapse

Instance Method Details

#DbToDir(db_env_name, sql, splitField, path, prefix) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/DbToDir.rb', line 4

def DbToDir( db_env_name, sql, splitField, path, prefix )
#    Dir.mkdir( path ) unless Dir.exists?( path )

    db = DataPipe.getFluidDb( db_env_name )

    hash = Hash.new
    rst = db.queryForResultset( sql, [] )
    columns = rst[0].keys if rst.length > 0


    rst.each do |r|
        hash[r[splitField]] = Array.new unless hash.has_key?(r[splitField])

        hash[r[splitField]] << r
    end

    basePath = "#{path}/#{prefix}-"
    Dir.glob( "#{basePath}*" ).each { |f| File.delete(f) }

    hash.each do |k,v|
        s = StreamBuilder.new
                .f( columns )
	v.each { |r| s.add *r.values }
        File.write( "#{basePath}#{k}.js", s.serialize )
    end

    return hash
end

#DbToJson(db_env_name, sql, splitField, path, prefix) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/DbToJson.rb', line 5

def DbToJson( db_env_name, sql, splitField, path, prefix )
    Dir.mkdir( path ) unless Dir.exists?( path )


    db = DataPipe.getFluidDb( db_env_name )

    hash = Hash.new
    rst = db.queryForResultset( sql, [] )
    columns = rst[0].keys if rst.length > 0
    rst.each do |r|
        hash[r[splitField]] = Array.new unless hash.has_key?(r[splitField])

        hash[r[splitField]] << r
    end

    basePath = "#{path}/#{prefix}-"
    Dir.glob( "#{basePath}*" ).each { |f| File.delete(f) }

    hash.each do |k,v|
        File.write( "#{basePath}#{k}.js", v.to_json )
    end
    
    return hash
end

#JsonToPgsql(source_env_name, destination_env_name, tableName, columns) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/JsonToPgsql.rb', line 5

def JsonToPgsql( source_env_name, destination_env_name, tableName, columns )
    d = DataPipe.getFluidDb( destination_env_name )

    d.execute( "TRUNCATE TABLE #{tableName}", [])
    
    results = s.connection.exec( sql )
    
    d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )
    
    JSON.parse( IO.read( DataPipe.getEnvVar( source_env_name ) ) ).each do |row|
        l = Array.new
	columns.each do |name|
            l << row[name]
        end
        d.connection.put_copy_data "#{l.join( '|' )}\n"
    end
    d.connection.put_copy_end
    
    DataPipe.log "#{tableName}: #{count}", true
end

#PathFromRemote(remoteUri, localPath, prefix) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/PathFromRemote.rb', line 5

def PathFromRemote( remoteUri, localPath, prefix )

	uri = URI.parse( remoteUri )
        DataPipe.log "remoteUri: #{remoteUri}, localPath: #{localPath}, prefix: #{prefix}", true

        Net::SFTP.start( uri.host, uri.user, :password => uri.password ) do |sftp|
                Dir.glob( "#{localPath}/#{prefix}*" ).each do |path|
                        File.rm( path )
                end     

		sftp.dir.foreach(uri.path) do |entry|
			name = entry.name
			if name[0,prefix.length] == prefix && entry.file? then
				DataPipe.log "sftp.rm: #{uri.path}/#{name}"
				sftp.download!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
				sftp.remove!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
			end
		end

        end

end

#PathToRemote(sourcePath, remoteUri, prefix) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/PathToRemote.rb', line 5

def PathToRemote( sourcePath, remoteUri, prefix )

	uri = URI.parse( remoteUri )
        DataPipe.log "sourcePath: #{sourcePath}, remoteUri: #{remoteUri}, prefix: #{prefix}", true

        Net::SFTP.start( uri.host, uri.user, :password => uri.password ) do |sftp|
		sftp.dir.foreach(uri.path) do |entry|
			name = entry.name
			if name[0,prefix.length] == prefix && entry.file? then
				DataPipe.log "sftp.rm: #{uri.path}/#{name}"
				sftp.remove!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
			end
		end

		Dir.glob( "#{sourcePath}/#{prefix}*" ).each do |path|
                        sftp.upload!( path, "#{uri.path}/#{File.basename( path )}" )
		end
        end

end

#PgsqlToPgsql(source_env_name, destination_env_name, sql, tableName, columns) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/PgsqlToPgsql.rb', line 4

def PgsqlToPgsql( source_env_name, destination_env_name, sql, tableName, columns )
    s = DataPipe.getFluidDb( source_env_name )
    d = DataPipe.getFluidDb( destination_env_name )

    d.execute( "TRUNCATE TABLE #{tableName}", [])
    
    results = s.connection.exec( sql )
    
    d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )
    
    count = results.ntuples()
    fieldCount = columns.length - 1
    0.upto( count -1 ) do |idx|
        l = Array.new
        0.upto( fieldCount ).each do |jdx|
            l << results.getvalue(idx, jdx)
        end
        d.connection.put_copy_data "#{l.join( '|' )}\n"
    end
    d.connection.put_copy_end
    
    DataPipe.log "#{tableName}: #{count}", true
end

#SqlServerToPgsql(s_s, d_s, sql, tableName, columns) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/SqlServerToPgsql.rb', line 4

def SqlServerToPgsql( s_s, d_s, sql, tableName, columns )
    s = FluidDb::Db( ENV[s_s] )
    d = FluidDb::Db( ENV[d_s] )

    d.connection.exec( "TRUNCATE TABLE #{tableName}")
    d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )

        results = s.connection.execute( sql )

        count = 0
        results.each(:as => :array, :cache_rows => false) do |r|
            count = count + 1
            d.connection.put_copy_data "#{r.join( '|' )}\n"
    end
    d.connection.put_copy_end
    
    DataPipe.log "#{tableName}: #{count}"
end