Class: Jrb::Hdfs::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/jrb/hdfs/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(uri, conf_path, opts = {}) ⇒ Client

Constructor

Parameters:

  • uri (String)

    uri of hdfs namenode

  • conf_path (String)

    configure dir of hadoop

  • opts (Hash) (defaults to: {})

    [Array]conf_files configure file that to be added into hdfs_conf resource [boolean]use_kerberos if use kerberos [String]kerberos_username [String]kerberos_keytab_path



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/jrb/hdfs/client.rb', line 20

def initialize(uri, conf_path, opts={})
  @uri = uri
  @hdfs_conf = Configuration.new
  @hdfs_conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")

  conf_files = opts[:conf_files] || ['core-site.xml', 'hdfs-site.xml']
  conf_files.each do |config_file|
    @hdfs_conf.add_resource(Path.new(File.join(conf_path, config_file)))
  end

  if opts[:use_kerberos] == true
    @hdfs_conf.set("hadoop.security.authentication", "kerberos")
    UserGroupInformation.setConfiguration(@hdfs_conf)
    UserGroupInformation.loginUserFromKeytab(opts[:kerberos_username], opts[:kerberos_keytab_path])
  end

  @hdfs = org.apache.hadoop.fs.FileSystem.get(URI.create(uri), @hdfs_conf)
end

Instance Method Details

#copy_to_local(hdfs_src, local_dst) ⇒ Boolean

Copy a hdfs file to local fs

Parameters:

  • hdfs_src (String)
  • local_dst (String)

Returns:

  • (Boolean)


67
68
69
70
71
72
73
74
75
# File 'lib/jrb/hdfs/client.rb', line 67

def copy_to_local(hdfs_src, local_dst)
  return false unless exists?(hdfs_src)
  begin
    @hdfs.copy_to_local_file(false, Path.new(hdfs_src), Path.new(local_dst), true)
    true
  rescue
    false
  end
end

#exists?(path) ⇒ Boolean

Check if entry exists in hdfs

Parameters:

  • path (String)

Returns:

  • (Boolean)


82
83
84
# File 'lib/jrb/hdfs/client.rb', line 82

def exists?(path)
  @hdfs.exists(Path.new(path))
end

#list(path, recursively = false) ⇒ Array[org.apache.hadoop.fs.Path]

Get list of hdfs entry

Parameters:

  • path (String)
  • recursively (Boolean) (defaults to: false)

Returns:

  • (Array[org.apache.hadoop.fs.Path])


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/jrb/hdfs/client.rb', line 45

def list(path, recursively=false)
  if recursively
    paths = []
    dir_itr = @hdfs.listFiles(Path.new(path), true)

    while dir_itr.hasNext
      next_path = dir_itr.next.getPath
      paths << next_path
    end
    paths
  else
    file_status = @hdfs.listStatus(Path.new(path))
    FileUtil.stat2Paths(file_status)
  end
end

#mkdir(path) ⇒ Boolean Also known as: mkdir_p

Mkdir in hdfs

Parameters:

  • path (String)

Returns:

  • (Boolean)


118
119
120
# File 'lib/jrb/hdfs/client.rb', line 118

def mkdir(path)
  @hdfs.mkdirs(Path.new(path))
end

#put(local_src, hdfs_dst) ⇒ Boolean

Put local entry into hdfs

Parameters:

  • local_src (String)
  • hdfs_dst (String)

Returns:

  • (Boolean)


103
104
105
106
107
108
109
110
111
# File 'lib/jrb/hdfs/client.rb', line 103

def put(local_src, hdfs_dst)
  return false unless File.exists?(local_src)
  begin
    @hdfs.copyFromLocalFile(false, true, Path.new(local_src), Path.new(hdfs_dst))
    true
  rescue
    false
  end
end

#rm(path, recursively = false) ⇒ Boolean

Remove a hdfs entry

Parameters:

  • path (String)
  • recursively (Boolean) (defaults to: false)

Returns:

  • (Boolean)


92
93
94
95
# File 'lib/jrb/hdfs/client.rb', line 92

def rm(path, recursively=false)
  return false unless exists?(path)
  @hdfs.delete(Path.new(path), recursively)
end