Class: Jrb::Hdfs::Client
- Inherits:
-
Object
- Object
- Jrb::Hdfs::Client
- Defined in:
- lib/jrb/hdfs/client.rb
Instance Method Summary collapse
-
#copy_to_local(hdfs_src, local_dst) ⇒ Boolean
Copy a hdfs file to local fs.
-
#exists?(path) ⇒ Boolean
Check if entry exists in hdfs.
-
#initialize(uri, conf_path, opts = {}) ⇒ Client
constructor
Constructor.
-
#list(path, recursively = false) ⇒ Array[org.apache.hadoop.fs.Path]
Get list of hdfs entry.
-
#mkdir(path) ⇒ Boolean
(also: #mkdir_p)
Mkdir in hdfs.
-
#put(local_src, hdfs_dst) ⇒ Boolean
Put local entry into hdfs.
-
#rm(path, recursively = false) ⇒ Boolean
Remove a hdfs entry.
Constructor Details
#initialize(uri, conf_path, opts = {}) ⇒ Client
Constructor
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/jrb/hdfs/client.rb', line 20 def initialize(uri, conf_path, opts={}) @uri = uri @hdfs_conf = Configuration.new @hdfs_conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem") conf_files = opts[:conf_files] || ['core-site.xml', 'hdfs-site.xml'] conf_files.each do |config_file| @hdfs_conf.add_resource(Path.new(File.join(conf_path, config_file))) end if opts[:use_kerberos] == true @hdfs_conf.set("hadoop.security.authentication", "kerberos") UserGroupInformation.setConfiguration(@hdfs_conf) UserGroupInformation.loginUserFromKeytab(opts[:kerberos_username], opts[:kerberos_keytab_path]) end @hdfs = org.apache.hadoop.fs.FileSystem.get(URI.create(uri), @hdfs_conf) end |
Instance Method Details
#copy_to_local(hdfs_src, local_dst) ⇒ Boolean
Copy a hdfs file to local fs
67 68 69 70 71 72 73 74 75 |
# File 'lib/jrb/hdfs/client.rb', line 67 def copy_to_local(hdfs_src, local_dst) return false unless exists?(hdfs_src) begin @hdfs.copy_to_local_file(false, Path.new(hdfs_src), Path.new(local_dst), true) true rescue false end end |
#exists?(path) ⇒ Boolean
Check if entry exists in hdfs
82 83 84 |
# File 'lib/jrb/hdfs/client.rb', line 82 def exists?(path) @hdfs.exists(Path.new(path)) end |
#list(path, recursively = false) ⇒ Array[org.apache.hadoop.fs.Path]
Get list of hdfs entry
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/jrb/hdfs/client.rb', line 45 def list(path, recursively=false) if recursively paths = [] dir_itr = @hdfs.listFiles(Path.new(path), true) while dir_itr.hasNext next_path = dir_itr.next.getPath paths << next_path end paths else file_status = @hdfs.listStatus(Path.new(path)) FileUtil.stat2Paths(file_status) end end |
#mkdir(path) ⇒ Boolean Also known as: mkdir_p
Mkdir in hdfs
118 119 120 |
# File 'lib/jrb/hdfs/client.rb', line 118 def mkdir(path) @hdfs.mkdirs(Path.new(path)) end |
#put(local_src, hdfs_dst) ⇒ Boolean
Put local entry into hdfs
103 104 105 106 107 108 109 110 111 |
# File 'lib/jrb/hdfs/client.rb', line 103 def put(local_src, hdfs_dst) return false unless File.exists?(local_src) begin @hdfs.copyFromLocalFile(false, true, Path.new(local_src), Path.new(hdfs_dst)) true rescue false end end |
#rm(path, recursively = false) ⇒ Boolean
Remove a hdfs entry
92 93 94 95 |
# File 'lib/jrb/hdfs/client.rb', line 92 def rm(path, recursively=false) return false unless exists?(path) @hdfs.delete(Path.new(path), recursively) end |