Class: WebHDFS::ClientV1
- Inherits:
-
Object
- Object
- WebHDFS::ClientV1
- Defined in:
- lib/webhdfs/client_v1.rb
Direct Known Subclasses
Constant Summary collapse
- OPT_TABLE =
This hash table holds command options.
{}
- REDIRECTED_OPERATIONS =
['APPEND', 'CREATE', 'OPEN', 'GETFILECHECKSUM']
Instance Attribute Summary collapse
-
#doas ⇒ Object
internal use only.
-
#host ⇒ Object
internal use only.
-
#open_timeout ⇒ Object
Returns the value of attribute open_timeout.
-
#port ⇒ Object
internal use only.
-
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
-
#username ⇒ Object
internal use only.
Instance Method Summary collapse
- #api_path(path) ⇒ Object
-
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”.
- #build_path(path, op, params) ⇒ Object
-
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN raise NotImplementedError end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN raise NotImplementedError end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN raise NotImplementedError end.
- #check_success_json(res, attr = nil) ⇒ Object
-
#checksum(path, options = {}) ⇒ Object
(also: #getfilechecksum)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”.
-
#chmod(path, mode, options = {}) ⇒ Object
(also: #setpermission)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION [&permission=<OCTAL>]”.
-
#chown(path, options = {}) ⇒ Object
(also: #setowner)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER [&owner=<USER>]”.
-
#content_summary(path, options = {}) ⇒ Object
(also: #getcontentsummary)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”.
-
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&replication=<SHORT>] [&permission=<OCTAL>]”.
-
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]”.
-
#homedir(options = {}) ⇒ Object
(also: #gethomedirectory)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil) ⇒ ClientV1
constructor
A new instance of ClientV1.
-
#list(path, options = {}) ⇒ Object
(also: #liststatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”.
-
#mkdir(path, options = {}) ⇒ Object
(also: #mkdirs)
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”.
- #operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
-
#read(path, options = {}) ⇒ Object
(also: #open)
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&buffersize=<INT>]”.
-
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”.
-
#replication(path, replnum, options = {}) ⇒ Object
(also: #setreplication)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION [&replication=<SHORT>]”.
-
#request(host, port, method, path, op = nil, params = {}, payload = nil) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error.
-
#stat(path, options = {}) ⇒ Object
(also: #getfilestatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”.
-
#touch(path, options = {}) ⇒ Object
(also: #settimes)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES [&modificationtime=<TIME>]” motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer.
Constructor Details
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil) ⇒ ClientV1
Returns a new instance of ClientV1.
16 17 18 19 20 21 |
# File 'lib/webhdfs/client_v1.rb', line 16 def initialize(host='localhost', port=50070, username=nil, doas=nil) @host = host @port = port @username = username @doas = doas end |
Instance Attribute Details
#doas ⇒ Object
internal use only
13 14 15 |
# File 'lib/webhdfs/client_v1.rb', line 13 def doas @doas end |
#host ⇒ Object
internal use only
13 14 15 |
# File 'lib/webhdfs/client_v1.rb', line 13 def host @host end |
#open_timeout ⇒ Object
Returns the value of attribute open_timeout.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def open_timeout @open_timeout end |
#port ⇒ Object
internal use only
13 14 15 |
# File 'lib/webhdfs/client_v1.rb', line 13 def port @port end |
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def read_timeout @read_timeout end |
#username ⇒ Object
internal use only
13 14 15 |
# File 'lib/webhdfs/client_v1.rb', line 13 def username @username end |
Instance Method Details
#api_path(path) ⇒ Object
184 185 186 187 188 189 190 |
# File 'lib/webhdfs/client_v1.rb', line 184 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end |
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”
34 35 36 37 38 |
# File 'lib/webhdfs/client_v1.rb', line 34 def append(path, body, ={}) (, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', , body) res.code == '200' end |
#build_path(path, op, params) ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/webhdfs/client_v1.rb', line 192 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end |
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN
raise NotImplementedError
end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN
raise NotImplementedError
end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
175 176 177 178 |
# File 'lib/webhdfs/client_v1.rb', line 175 def (, optdecl=[]) ex = .keys - (optdecl || []) raise ArgumentError, "no such option: #{ex.keys.join(' ')}" unless ex.empty? end |
#check_success_json(res, attr = nil) ⇒ Object
180 181 182 |
# File 'lib/webhdfs/client_v1.rb', line 180 def check_success_json(res, attr=nil) res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr]) end |
#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”
104 105 106 107 108 |
# File 'lib/webhdfs/client_v1.rb', line 104 def checksum(path, ={}) (, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', ) check_success_json(res, 'FileChecksum') end |
#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
[&permission=<OCTAL>]"
121 122 123 124 125 |
# File 'lib/webhdfs/client_v1.rb', line 121 def chmod(path, mode, ={}) (, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', .merge({'permission' => mode})) res.code == '200' end |
#chown(path, options = {}) ⇒ Object Also known as: setowner
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
[&owner=<USER>][&group=<GROUP>]"
130 131 132 133 134 135 136 137 |
# File 'lib/webhdfs/client_v1.rb', line 130 def chown(path, ={}) (, OPT_TABLE['SETOWNER']) unless .has_key?('owner') or .has_key?('group') raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', ) res.code == '200' end |
#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”
96 97 98 99 100 |
# File 'lib/webhdfs/client_v1.rb', line 96 def content_summary(path, ={}) (, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', ) check_success_json(res, 'ContentSummary') end |
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"
26 27 28 29 30 |
# File 'lib/webhdfs/client_v1.rb', line 26 def create(path, body, ={}) (, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', , body) res.code == '201' end |
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
72 73 74 75 76 |
# File 'lib/webhdfs/client_v1.rb', line 72 def delete(path, ={}) (, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', ) check_success_json(res, 'boolean') end |
#homedir(options = {}) ⇒ Object Also known as: gethomedirectory
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”
112 113 114 115 116 |
# File 'lib/webhdfs/client_v1.rb', line 112 def homedir(={}) (, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', ) check_success_json(res, 'Path') end |
#list(path, options = {}) ⇒ Object Also known as: liststatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”
88 89 90 91 92 |
# File 'lib/webhdfs/client_v1.rb', line 88 def list(path, ={}) (, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', ) check_success_json(res, 'FileStatuses')['FileStatus'] end |
#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”
52 53 54 55 56 |
# File 'lib/webhdfs/client_v1.rb', line 52 def mkdir(path, ={}) (, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', ) check_success_json(res, 'boolean') end |
#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/webhdfs/client_v1.rb', line 207 def operate_requests(method, path, op, params={}, payload=nil) if REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless res.is_a?(Net::HTTPRedirection) and res['location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload) else request(@host, @port, method, path, op, params, nil) end end |
#read(path, options = {}) ⇒ Object Also known as: open
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
43 44 45 46 47 |
# File 'lib/webhdfs/client_v1.rb', line 43 def read(path, ={}) (, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', ) res.body end |
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
61 62 63 64 65 66 67 68 |
# File 'lib/webhdfs/client_v1.rb', line 61 def rename(path, dest, ={}) (, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
[&replication=<SHORT>]"
143 144 145 146 147 |
# File 'lib/webhdfs/client_v1.rb', line 143 def replication(path, replnum, ={}) (, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', .merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end |
#request(host, port, method, path, op = nil, params = {}, payload = nil) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/webhdfs/client_v1.rb', line 232 def request(host, port, method, path, op=nil, params={}, payload=nil) conn = Net::HTTP.start(host, port) conn.open_timeout = @open_timeout if @open_timeout conn.read_timeout = @read_timeout if @read_timeout request_path = if op build_path(path, op, params) else path end p({:host => host, :port => port, :method => method, :path => request_path}) res = conn.send_request(method, request_path, payload) case res when Net::HTTPSuccess res when Net::HTTPRedirection res else = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end case res.code when '400' raise WebHDFS::ClientError, when '401' raise WebHDFS::SecurityError, when '403' raise WebHDFS::IOError, when '404' raise WebHDFS::FileNotFoundError, when '500' raise WebHDFS::ServerError, else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{}" end end end |
#stat(path, options = {}) ⇒ Object Also known as: getfilestatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”
80 81 82 83 84 |
# File 'lib/webhdfs/client_v1.rb', line 80 def stat(path, ={}) (, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', ) check_success_json(res, 'FileStatus') end |
#touch(path, options = {}) ⇒ Object Also known as: settimes
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
[&modificationtime=<TIME>][&accesstime=<TIME>]"
motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer
154 155 156 157 158 159 160 161 |
# File 'lib/webhdfs/client_v1.rb', line 154 def touch(path, ={}) (, OPT_TABLE['SETTIMES']) unless .has_key?('modificationtime') or .has_key?('accesstime') raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime" end res = operate_requests('PUT', path, 'SETTIMES', ) res.code == '200' end |