Top Level Namespace
Defined Under Namespace
Modules: DataPipe, Telemetry
Classes: StreamBuilder
Instance Method Summary
collapse
-
#DbToDir(db_env_name, sql, splitField, path, prefix) ⇒ Object
-
#DbToJson(db_env_name, sql, splitField, path, prefix) ⇒ Object
-
#JsonToPgsql(source_env_name, destination_env_name, tableName, columns) ⇒ Object
-
#PathFromRemote(remoteUri, localPath, prefix) ⇒ Object
-
#PathToRemote(sourcePath, remoteUri, prefix) ⇒ Object
-
#PgsqlToPgsql(source_env_name, destination_env_name, sql, tableName, columns) ⇒ Object
-
#SqlServerToPgsql(s_s, d_s, sql, tableName, columns) ⇒ Object
Instance Method Details
#DbToDir(db_env_name, sql, splitField, path, prefix) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/DbToDir.rb', line 4
def DbToDir( db_env_name, sql, splitField, path, prefix )
db = DataPipe.getFluidDb( db_env_name )
hash = Hash.new
rst = db.queryForResultset( sql, [] )
columns = rst[0].keys if rst.length > 0
rst.each do |r|
hash[r[splitField]] = Array.new unless hash.has_key?(r[splitField])
hash[r[splitField]] << r
end
basePath = "#{path}/#{prefix}-"
Dir.glob( "#{basePath}*" ).each { |f| File.delete(f) }
hash.each do |k,v|
s = StreamBuilder.new
.f( columns )
v.each { |r| s.add *r.values }
File.write( "#{basePath}#{k}.js", s.serialize )
end
return hash
end
|
#DbToJson(db_env_name, sql, splitField, path, prefix) ⇒ Object
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/DbToJson.rb', line 5
def DbToJson( db_env_name, sql, splitField, path, prefix )
Dir.mkdir( path ) unless Dir.exists?( path )
db = DataPipe.getFluidDb( db_env_name )
hash = Hash.new
rst = db.queryForResultset( sql, [] )
columns = rst[0].keys if rst.length > 0
rst.each do |r|
hash[r[splitField]] = Array.new unless hash.has_key?(r[splitField])
hash[r[splitField]] << r
end
basePath = "#{path}/#{prefix}-"
Dir.glob( "#{basePath}*" ).each { |f| File.delete(f) }
hash.each do |k,v|
File.write( "#{basePath}#{k}.js", v.to_json )
end
return hash
end
|
#JsonToPgsql(source_env_name, destination_env_name, tableName, columns) ⇒ Object
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/JsonToPgsql.rb', line 5
def JsonToPgsql( source_env_name, destination_env_name, tableName, columns )
d = DataPipe.getFluidDb( destination_env_name )
d.execute( "TRUNCATE TABLE #{tableName}", [])
results = s.connection.exec( sql )
d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )
JSON.parse( IO.read( DataPipe.getEnvVar( source_env_name ) ) ).each do |row|
l = Array.new
columns.each do |name|
l << row[name]
end
d.connection.put_copy_data "#{l.join( '|' )}\n"
end
d.connection.put_copy_end
DataPipe.log "#{tableName}: #{count}", true
end
|
#PathFromRemote(remoteUri, localPath, prefix) ⇒ Object
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/PathFromRemote.rb', line 5
def PathFromRemote( remoteUri, localPath, prefix )
uri = URI.parse( remoteUri )
DataPipe.log "remoteUri: #{remoteUri}, localPath: #{localPath}, prefix: #{prefix}", true
Net::SFTP.start( uri.host, uri.user, :password => uri.password ) do |sftp|
Dir.glob( "#{localPath}/#{prefix}*" ).each do |path|
File.rm( path )
end
sftp.dir.foreach(uri.path) do |entry|
name = entry.name
if name[0,prefix.length] == prefix && entry.file? then
DataPipe.log "sftp.rm: #{uri.path}/#{name}"
sftp.download!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
sftp.remove!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
end
end
end
end
|
#PathToRemote(sourcePath, remoteUri, prefix) ⇒ Object
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/PathToRemote.rb', line 5
def PathToRemote( sourcePath, remoteUri, prefix )
uri = URI.parse( remoteUri )
DataPipe.log "sourcePath: #{sourcePath}, remoteUri: #{remoteUri}, prefix: #{prefix}", true
Net::SFTP.start( uri.host, uri.user, :password => uri.password ) do |sftp|
sftp.dir.foreach(uri.path) do |entry|
name = entry.name
if name[0,prefix.length] == prefix && entry.file? then
DataPipe.log "sftp.rm: #{uri.path}/#{name}"
sftp.remove!( "#{uri.path}/#{name}" ) if name[0,prefix.length] == prefix && entry.file?
end
end
Dir.glob( "#{sourcePath}/#{prefix}*" ).each do |path|
sftp.upload!( path, "#{uri.path}/#{File.basename( path )}" )
end
end
end
|
#PgsqlToPgsql(source_env_name, destination_env_name, sql, tableName, columns) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/PgsqlToPgsql.rb', line 4
def PgsqlToPgsql( source_env_name, destination_env_name, sql, tableName, columns )
s = DataPipe.getFluidDb( source_env_name )
d = DataPipe.getFluidDb( destination_env_name )
d.execute( "TRUNCATE TABLE #{tableName}", [])
results = s.connection.exec( sql )
d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )
count = results.ntuples()
fieldCount = columns.length - 1
0.upto( count -1 ) do |idx|
l = Array.new
0.upto( fieldCount ).each do |jdx|
l << results.getvalue(idx, jdx)
end
d.connection.put_copy_data "#{l.join( '|' )}\n"
end
d.connection.put_copy_end
DataPipe.log "#{tableName}: #{count}", true
end
|
#SqlServerToPgsql(s_s, d_s, sql, tableName, columns) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# File 'lib/SqlServerToPgsql.rb', line 4
def SqlServerToPgsql( s_s, d_s, sql, tableName, columns )
s = FluidDb::Db( ENV[s_s] )
d = FluidDb::Db( ENV[d_s] )
d.connection.exec( "TRUNCATE TABLE #{tableName}")
d.connection.exec( "COPY #{tableName} (#{columns.join( "," )}) FROM STDIN WITH DELIMITER AS '|' CSV;" )
results = s.connection.execute( sql )
count = 0
results.each(:as => :array, :cache_rows => false) do |r|
count = count + 1
d.connection.put_copy_data "#{r.join( '|' )}\n"
end
d.connection.put_copy_end
DataPipe.log "#{tableName}: #{count}"
end
|