Class: WebHDFS::ClientV1
- Inherits:
-
Object
- Object
- WebHDFS::ClientV1
- Defined in:
- lib/webhdfs/client_v1.rb
Direct Known Subclasses
Constant Summary collapse
- OPT_TABLE =
This hash table holds command options.
{}
- REDIRECTED_OPERATIONS =
['APPEND' ,'CREATE' , 'OPEN', 'GETFILECHECKSUM']
- REDIRECTED_CODE =
(300..399)
Instance Attribute Summary collapse
-
#auth_type ⇒ Object
pseudo, kerberos.
-
#doas ⇒ Object
internal use only.
-
#host ⇒ Object
internal use only.
-
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
-
#keytab ⇒ Object
Returns the value of attribute keytab.
-
#open_timeout ⇒ Object
default 30s (in ruby net/http).
-
#pass ⇒ Object
Returns the value of attribute pass.
-
#port ⇒ Object
internal use only.
-
#read_timeout ⇒ Object
default 60s (in ruby net/http).
-
#username ⇒ Object
internal use only.
Instance Method Summary collapse
- #api_path(path) ⇒ Object
-
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”.
- #build_path(path, op, params) ⇒ Object
-
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN raise NotImplementedError end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN raise NotImplementedError end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN raise NotImplementedError end.
- #check_success_json(res, attr = nil) ⇒ Object
-
#checksum(path, options = {}) ⇒ Object
(also: #getfilechecksum)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”.
-
#chmod(path, mode, options = {}) ⇒ Object
(also: #setpermission)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION [&permission=<OCTAL>]”.
-
#chown(path, options = {}) ⇒ Object
(also: #setowner)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER [&owner=<USER>]”.
-
#content_summary(path, options = {}) ⇒ Object
(also: #getcontentsummary)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”.
-
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&replication=<SHORT>] [&permission=<OCTAL>]”.
-
#create_symlink(path, dest, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATESYMLINK&destination=<PATH> [&createParent=<true|false>]”.
-
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]”.
-
#homedir(options = {}) ⇒ Object
(also: #gethomedirectory)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
#initialize(host = 'localhost', port = 14000, username = nil, doas = nil) ⇒ ClientV1
constructor
A new instance of ClientV1.
-
#list(path, options = {}) ⇒ Object
(also: #liststatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”.
-
#mkdir(path, options = {}) ⇒ Object
(also: #mkdirs)
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”.
- #operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
-
#read(path, options = {}) ⇒ Object
(also: #open)
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&buffersize=<INT>]”.
-
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”.
-
#replication(path, replnum, options = {}) ⇒ Object
(also: #setreplication)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION [&replication=<SHORT>]”.
-
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error.
-
#stat(path, options = {}) ⇒ Object
(also: #getfilestatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”.
-
#touch(path, options = {}) ⇒ Object
(also: #settimes)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES [&modificationtime=<TIME>]” motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer.
Constructor Details
#initialize(host = 'localhost', port = 14000, username = nil, doas = nil) ⇒ ClientV1
Returns a new instance of ClientV1.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/webhdfs/client_v1.rb', line 24 def initialize(host='localhost', port=14000, username=nil, doas=nil) @host = host @port = port @username = username @doas = doas @httpfs_mode = false @auth_type = :pseudo @keytab = nil @pass = nil end |
Instance Attribute Details
#auth_type ⇒ Object
pseudo, kerberos
19 20 21 |
# File 'lib/webhdfs/client_v1.rb', line 19 def auth_type @auth_type end |
#doas ⇒ Object
internal use only
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def doas @doas end |
#host ⇒ Object
internal use only
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def host @host end |
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
18 19 20 |
# File 'lib/webhdfs/client_v1.rb', line 18 def httpfs_mode @httpfs_mode end |
#keytab ⇒ Object
Returns the value of attribute keytab.
20 21 22 |
# File 'lib/webhdfs/client_v1.rb', line 20 def keytab @keytab end |
#open_timeout ⇒ Object
default 30s (in ruby net/http)
16 17 18 |
# File 'lib/webhdfs/client_v1.rb', line 16 def open_timeout @open_timeout end |
#pass ⇒ Object
Returns the value of attribute pass.
21 22 23 |
# File 'lib/webhdfs/client_v1.rb', line 21 def pass @pass end |
#port ⇒ Object
internal use only
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def port @port end |
#read_timeout ⇒ Object
default 60s (in ruby net/http)
17 18 19 |
# File 'lib/webhdfs/client_v1.rb', line 17 def read_timeout @read_timeout end |
#username ⇒ Object
internal use only
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def username @username end |
Instance Method Details
#api_path(path) ⇒ Object
218 219 220 221 222 223 224 |
# File 'lib/webhdfs/client_v1.rb', line 218 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end |
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”
63 64 65 66 67 68 69 70 |
# File 'lib/webhdfs/client_v1.rb', line 63 def append(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['APPEND']) res = operate_requests(:post, path, 'APPEND', , body) res.code == 200 end |
#build_path(path, op, params) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/webhdfs/client_v1.rb', line 226 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end |
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN
raise NotImplementedError
end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN
raise NotImplementedError
end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
209 210 211 212 |
# File 'lib/webhdfs/client_v1.rb', line 209 def (, optdecl=[]) ex = .keys.map(&:to_s) - (optdecl || []) raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty? end |
#check_success_json(res, attr = nil) ⇒ Object
214 215 216 |
# File 'lib/webhdfs/client_v1.rb', line 214 def check_success_json(res, attr=nil) res.code == 200 and res.headers['Content-Type'].include?('application/json') and (attr.nil? or JSON.parse(res.body)[attr]) end |
#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”
136 137 138 139 140 |
# File 'lib/webhdfs/client_v1.rb', line 136 def checksum(path, ={}) (, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests(:get, path, 'GETFILECHECKSUM', ) check_success_json(res, 'FileChecksum') end |
#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
[&=<OCTAL>]"
153 154 155 156 157 |
# File 'lib/webhdfs/client_v1.rb', line 153 def chmod(path, mode, ={}) (, OPT_TABLE['SETPERMISSION']) res = operate_requests(:put, path, 'SETPERMISSION', .merge({'permission' => mode})) res.code == 200 end |
#chown(path, options = {}) ⇒ Object Also known as: setowner
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
[&owner=<USER>][&group=<GROUP>]"
162 163 164 165 166 167 168 169 170 |
# File 'lib/webhdfs/client_v1.rb', line 162 def chown(path, ={}) (, OPT_TABLE['SETOWNER']) unless .has_key?('owner') or .has_key?('group') or .has_key?(:owner) or .has_key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests(:put, path, 'SETOWNER', ) res.code == 200 end |
#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”
128 129 130 131 132 |
# File 'lib/webhdfs/client_v1.rb', line 128 def content_summary(path, ={}) (, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests(:get, path, 'GETCONTENTSUMMARY', ) check_success_json(res, 'ContentSummary') end |
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&=<OCTAL>][&buffersize=<INT>]"
52 53 54 55 56 57 58 59 |
# File 'lib/webhdfs/client_v1.rb', line 52 def create(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['CREATE']) res = operate_requests(:put, path, 'CREATE', , body) res.code == 201 end |
#create_symlink(path, dest, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATESYMLINK&destination=<PATH>
[&createParent=<true|false>]"
39 40 41 42 43 44 45 46 |
# File 'lib/webhdfs/client_v1.rb', line 39 def create_symlink(path, dest, ={}) (, OPT_TABLE['CREATESYMLINK']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests(:put, path, 'CREATESYMLINK', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
104 105 106 107 108 |
# File 'lib/webhdfs/client_v1.rb', line 104 def delete(path, ={}) (, OPT_TABLE['DELETE']) res = operate_requests(:delete, path, 'DELETE', ) check_success_json(res, 'boolean') end |
#homedir(options = {}) ⇒ Object Also known as: gethomedirectory
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”
144 145 146 147 148 |
# File 'lib/webhdfs/client_v1.rb', line 144 def homedir(={}) (, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests(:get, '/', 'GETHOMEDIRECTORY', ) check_success_json(res, 'Path') end |
#list(path, options = {}) ⇒ Object Also known as: liststatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”
120 121 122 123 124 |
# File 'lib/webhdfs/client_v1.rb', line 120 def list(path, ={}) (, OPT_TABLE['LISTSTATUS']) res = operate_requests(:get, path, 'LISTSTATUS', ) check_success_json(res, 'FileStatuses')['FileStatus'] end |
#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”
84 85 86 87 88 |
# File 'lib/webhdfs/client_v1.rb', line 84 def mkdir(path, ={}) (, OPT_TABLE['MKDIRS']) res = operate_requests(:put, path, 'MKDIRS', ) check_success_json(res, 'boolean') end |
#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/webhdfs/client_v1.rb', line 243 def operate_requests(method, path, op, params={}, payload=nil) if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless REDIRECTED_CODE.include?(res.code) and res.headers['Location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res.headers['Location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'}) else if @httpfs_mode and not payload.nil? request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'}) else request(@host, @port, method, path, op, params, payload) end end end |
#read(path, options = {}) ⇒ Object Also known as: open
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
75 76 77 78 79 |
# File 'lib/webhdfs/client_v1.rb', line 75 def read(path, ={}) (, OPT_TABLE['OPEN']) res = operate_requests(:get, path, 'OPEN', ) res.body end |
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
93 94 95 96 97 98 99 100 |
# File 'lib/webhdfs/client_v1.rb', line 93 def rename(path, dest, ={}) (, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests(:put, path, 'RENAME', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
[&replication=<SHORT>]"
176 177 178 179 180 |
# File 'lib/webhdfs/client_v1.rb', line 176 def replication(path, replnum, ={}) (, OPT_TABLE['SETREPLICATION']) res = operate_requests(:put, path, 'SETREPLICATION', .merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end |
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/webhdfs/client_v1.rb', line 280 def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil) req = HTTPI::Request.new krb5 = nil HTTPI.log = true # disable logging HTTPI.log_level= :debug HTTPI.adapter = :net_http # one of [:httpclient, :curb, :net_http] if @auth_type == :kerberos if @username krb5 = Krb5Auth::Krb5.new inited = false; begin if @keytab inited = krb5.get_init_creds_keytab(@username, @keytab) elsif @pass inited = krb5.get_init_creds_password(@username, @pass) else raise ArgumentError, "kerberos authentication requires keytab or password" end rescue raise WebHDFS::SecurityError, "kerberos initialization is failed" end if inited krb5.cache HTTPI.adapter = :curb req.auth.gssnegotiate end end end req.open_timeout = @open_timeout if @open_timeout req.read_timeout = @read_timeout if @read_timeout request_path = if op build_path(path, op, params) else path end req.url = URI::HTTP.build({:host => host, :port => port}) + request_path req.headers = header.nil? ? {} : header # MUST BE ASSIGN {} if nil BY zixian.shen req.body = payload.nil? ? {} : payload # MUST BE ASSIGN {} if nil BY zixian.shen res = HTTPI.request( method, req ) if HTTPI::Response::SuccessfulResponseCodes.include?(res.code) res elsif REDIRECTED_CODE.include?(res.code) res else = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end case res.code when 400 raise WebHDFS::ClientError, when 401 raise WebHDFS::SecurityError, when 403 raise WebHDFS::IOError, when 404 raise WebHDFS::FileNotFoundError, when 500 raise WebHDFS::ServerError, else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}" end end end |
#stat(path, options = {}) ⇒ Object Also known as: getfilestatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”
112 113 114 115 116 |
# File 'lib/webhdfs/client_v1.rb', line 112 def stat(path, ={}) (, OPT_TABLE['GETFILESTATUS']) res = operate_requests(:get, path, 'GETFILESTATUS', ) check_success_json(res, 'FileStatus') end |
#touch(path, options = {}) ⇒ Object Also known as: settimes
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
[&modificationtime=<TIME>][&accesstime=<TIME>]"
motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer
187 188 189 190 191 192 193 194 195 |
# File 'lib/webhdfs/client_v1.rb', line 187 def touch(path, ={}) (, OPT_TABLE['SETTIMES']) unless .has_key?('modificationtime') or .has_key?('accesstime') or .has_key?(:modificationtime) or .has_key?(:accesstime) raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime" end res = operate_requests(:put, path, 'SETTIMES', ) res.code == 200 end |