Class: WebHDFS::ClientV1
- Inherits:
-
Object
- Object
- WebHDFS::ClientV1
- Defined in:
- lib/webhdfs/client_v1.rb
Direct Known Subclasses
Constant Summary collapse
- OPT_TABLE =
This hash table holds command options.
{}
- KNOWN_ERRORS =
internal use only
['LeaseExpiredException'].freeze
- SSL_VERIFY_MODES =
[:none, :peer]
- REDIRECTED_OPERATIONS =
['APPEND', 'CREATE', 'OPEN', 'GETFILECHECKSUM']
Instance Attribute Summary collapse
-
#doas ⇒ Object
Returns the value of attribute doas.
-
#host ⇒ Object
Returns the value of attribute host.
-
#http_headers ⇒ Object
Returns the value of attribute http_headers.
-
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
-
#kerberos ⇒ Object
Returns the value of attribute kerberos.
-
#kerberos_delegation_token ⇒ Object
Returns the value of attribute kerberos_delegation_token.
-
#kerberos_keytab ⇒ Object
Returns the value of attribute kerberos_keytab.
-
#open_timeout ⇒ Object
default 30s (in ruby net/http).
-
#port ⇒ Object
Returns the value of attribute port.
-
#proxy_address ⇒ Object
Returns the value of attribute proxy_address.
-
#proxy_pass ⇒ Object
Returns the value of attribute proxy_pass.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
-
#proxy_user ⇒ Object
Returns the value of attribute proxy_user.
-
#read_timeout ⇒ Object
default 60s (in ruby net/http).
-
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false).
-
#retry_known_errors ⇒ Object
default false (not to retry).
-
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false).
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#ssl_ca_file ⇒ Object
Returns the value of attribute ssl_ca_file.
-
#ssl_cert ⇒ Object
Returns the value of attribute ssl_cert.
-
#ssl_key ⇒ Object
Returns the value of attribute ssl_key.
-
#ssl_verify_mode ⇒ Object
Returns the value of attribute ssl_verify_mode.
-
#ssl_version ⇒ Object
Returns the value of attribute ssl_version.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #api_path(path) ⇒ Object
-
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND [&buffersize=<INT>]”.
- #build_path(path, op, params) ⇒ Object
-
#check_options(options, optdecl = []) ⇒ Object
def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN raise NotImplementedError end.
- #check_success_json(res, attr = nil) ⇒ Object
-
#checksum(path, options = {}) ⇒ Object
(also: #getfilechecksum)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”.
-
#chmod(path, mode, options = {}) ⇒ Object
(also: #setpermission)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION [&permission=<OCTAL>]”.
-
#chown(path, options = {}) ⇒ Object
(also: #setowner)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER [&owner=<USER>]”.
-
#content_summary(path, options = {}) ⇒ Object
(also: #getcontentsummary)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”.
-
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&replication=<SHORT>] [&permission=<OCTAL>] [&delegation=<DELEGATION_TOKEN>]”.
-
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]”.
- #get_cached_kerberos_delegation_token(force_renew = nil) ⇒ Object
-
#get_kerberos_delegation_token(user, options = {}) ⇒ Object
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=<USER>”.
-
#gettrashroot(options = {}) ⇒ Object
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETTRASHROOT”.
-
#homedir(options = {}) ⇒ Object
(also: #gethomedirectory)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}, renew_kerberos_delegation_token_time_hour = nil) ⇒ ClientV1
constructor
A new instance of ClientV1.
-
#list(path, options = {}) ⇒ Object
(also: #liststatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”.
-
#mkdir(path, options = {}) ⇒ Object
(also: #mkdirs)
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”.
- #operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
-
#read(path, options = {}) ⇒ Object
(also: #open)
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&buffersize=<INT>]”.
-
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”.
-
#renew_kerberos_delegation_token(token, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/?op=RENEWDELEGATIONTOKEN&token=<DELEGATION_TOKEN>”.
-
#replication(path, replnum, options = {}) ⇒ Object
(also: #setreplication)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION [&replication=<SHORT>]”.
-
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error.
- #should_kerberos_token_updated? ⇒ Boolean
-
#stat(path, options = {}) ⇒ Object
(also: #getfilestatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”.
-
#touch(path, options = {}) ⇒ Object
(also: #settimes)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES [&modificationtime=<TIME>]” motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer.
Constructor Details
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}, renew_kerberos_delegation_token_time_hour = nil) ⇒ ClientV1
Returns a new instance of ClientV1.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/webhdfs/client_v1.rb', line 41 def initialize(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil) @host = host @port = port @username = username @doas = doas @proxy_address = proxy_address @proxy_port = proxy_port @retry_known_errors = false @retry_times = 1 @retry_interval = 1 @httpfs_mode = false @ssl = false @ssl_ca_file = nil @ssl_verify_mode = nil @ssl_cert = nil @ssl_key = nil @ssl_version = nil @kerberos = false @kerberos_keytab = nil @renew_kerberos_delegation_token_time_hour = renew_kerberos_delegation_token_time_hour @kerberos_delegation_token = nil @kerberos_token_updated_at = Time.now @http_headers = http_headers end |
Instance Attribute Details
#doas ⇒ Object
Returns the value of attribute doas.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def doas @doas end |
#host ⇒ Object
Returns the value of attribute host.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def host @host end |
#http_headers ⇒ Object
Returns the value of attribute http_headers.
30 31 32 |
# File 'lib/webhdfs/client_v1.rb', line 30 def http_headers @http_headers end |
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
19 20 21 |
# File 'lib/webhdfs/client_v1.rb', line 19 def httpfs_mode @httpfs_mode end |
#kerberos ⇒ Object
Returns the value of attribute kerberos.
29 30 31 |
# File 'lib/webhdfs/client_v1.rb', line 29 def kerberos @kerberos end |
#kerberos_delegation_token ⇒ Object
Returns the value of attribute kerberos_delegation_token.
31 32 33 |
# File 'lib/webhdfs/client_v1.rb', line 31 def kerberos_delegation_token @kerberos_delegation_token end |
#kerberos_keytab ⇒ Object
Returns the value of attribute kerberos_keytab.
29 30 31 |
# File 'lib/webhdfs/client_v1.rb', line 29 def kerberos_keytab @kerberos_keytab end |
#open_timeout ⇒ Object
default 30s (in ruby net/http)
17 18 19 |
# File 'lib/webhdfs/client_v1.rb', line 17 def open_timeout @open_timeout end |
#port ⇒ Object
Returns the value of attribute port.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def port @port end |
#proxy_address ⇒ Object
Returns the value of attribute proxy_address.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def proxy_address @proxy_address end |
#proxy_pass ⇒ Object
Returns the value of attribute proxy_pass.
16 17 18 |
# File 'lib/webhdfs/client_v1.rb', line 16 def proxy_pass @proxy_pass end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def proxy_port @proxy_port end |
#proxy_user ⇒ Object
Returns the value of attribute proxy_user.
16 17 18 |
# File 'lib/webhdfs/client_v1.rb', line 16 def proxy_user @proxy_user end |
#read_timeout ⇒ Object
default 60s (in ruby net/http)
18 19 20 |
# File 'lib/webhdfs/client_v1.rb', line 18 def read_timeout @read_timeout end |
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false)
22 23 24 |
# File 'lib/webhdfs/client_v1.rb', line 22 def retry_interval @retry_interval end |
#retry_known_errors ⇒ Object
default false (not to retry)
20 21 22 |
# File 'lib/webhdfs/client_v1.rb', line 20 def retry_known_errors @retry_known_errors end |
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false)
21 22 23 |
# File 'lib/webhdfs/client_v1.rb', line 21 def retry_times @retry_times end |
#ssl ⇒ Object
Returns the value of attribute ssl.
23 24 25 |
# File 'lib/webhdfs/client_v1.rb', line 23 def ssl @ssl end |
#ssl_ca_file ⇒ Object
Returns the value of attribute ssl_ca_file.
24 25 26 |
# File 'lib/webhdfs/client_v1.rb', line 24 def ssl_ca_file @ssl_ca_file end |
#ssl_cert ⇒ Object
Returns the value of attribute ssl_cert.
26 27 28 |
# File 'lib/webhdfs/client_v1.rb', line 26 def ssl_cert @ssl_cert end |
#ssl_key ⇒ Object
Returns the value of attribute ssl_key.
27 28 29 |
# File 'lib/webhdfs/client_v1.rb', line 27 def ssl_key @ssl_key end |
#ssl_verify_mode ⇒ Object
Returns the value of attribute ssl_verify_mode.
25 26 27 |
# File 'lib/webhdfs/client_v1.rb', line 25 def ssl_verify_mode @ssl_verify_mode end |
#ssl_version ⇒ Object
Returns the value of attribute ssl_version.
28 29 30 |
# File 'lib/webhdfs/client_v1.rb', line 28 def ssl_version @ssl_version end |
#username ⇒ Object
Returns the value of attribute username.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def username @username end |
Instance Method Details
#api_path(path) ⇒ Object
284 285 286 287 288 289 290 |
# File 'lib/webhdfs/client_v1.rb', line 284 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end |
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND
[&buffersize=<INT>][&delegation=<DELEGATION_TOKEN>]"
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/webhdfs/client_v1.rb', line 106 def append(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end if @renew_kerberos_delegation_token_time_hour = .merge('delegation' => get_cached_kerberos_delegation_token) end (, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', , body) res.code == '200' end |
#build_path(path, op, params) ⇒ Object
292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/webhdfs/client_v1.rb', line 292 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end |
#check_options(options, optdecl = []) ⇒ Object
def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
275 276 277 278 |
# File 'lib/webhdfs/client_v1.rb', line 275 def (, optdecl=[]) ex = .keys.map(&:to_s) - (optdecl || []) raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty? end |
#check_success_json(res, attr = nil) ⇒ Object
280 281 282 |
# File 'lib/webhdfs/client_v1.rb', line 280 def check_success_json(res, attr=nil) res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr]) end |
#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”
182 183 184 185 186 |
# File 'lib/webhdfs/client_v1.rb', line 182 def checksum(path, ={}) (, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', ) check_success_json(res, 'FileChecksum') end |
#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
[&=<OCTAL>]"
206 207 208 209 210 |
# File 'lib/webhdfs/client_v1.rb', line 206 def chmod(path, mode, ={}) (, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', .merge({'permission' => mode})) res.code == '200' end |
#chown(path, options = {}) ⇒ Object Also known as: setowner
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
[&owner=<USER>][&group=<GROUP>]"
215 216 217 218 219 220 221 222 223 |
# File 'lib/webhdfs/client_v1.rb', line 215 def chown(path, ={}) (, OPT_TABLE['SETOWNER']) unless .has_key?('owner') or .has_key?('group') or .has_key?(:owner) or .has_key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', ) res.code == '200' end |
#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”
174 175 176 177 178 |
# File 'lib/webhdfs/client_v1.rb', line 174 def content_summary(path, ={}) (, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', ) check_success_json(res, 'ContentSummary') end |
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&=<OCTAL>][&buffersize=<INT>]
[&delegation=<DELEGATION_TOKEN>]"
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/webhdfs/client_v1.rb', line 91 def create(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end if @renew_kerberos_delegation_token_time_hour = .merge('delegation' => get_cached_kerberos_delegation_token) end (, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', , body) res.code == '201' end |
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
150 151 152 153 154 |
# File 'lib/webhdfs/client_v1.rb', line 150 def delete(path, ={}) (, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', ) check_success_json(res, 'boolean') end |
#get_cached_kerberos_delegation_token(force_renew = nil) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/webhdfs/client_v1.rb', line 74 def get_cached_kerberos_delegation_token(force_renew=nil) return @kerberos_delegation_token if @kerberos_delegation_token && !should_kerberos_token_updated? && !force_renew if !@kerberos_delegation_token || force_renew @kerberos_delegation_token = get_kerberos_delegation_token(@username) @kerberos_token_updated_at = Time.now else renew_kerberos_delegation_token(@kerberos_delegation_token) @kerberos_token_updated_at = Time.now end @kerberos_delegation_token end |
#get_kerberos_delegation_token(user, options = {}) ⇒ Object
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=<USER>”
253 254 255 256 257 258 259 |
# File 'lib/webhdfs/client_v1.rb', line 253 def get_kerberos_delegation_token(user, ={}) = .merge({ 'renewer' => user }) (, OPT_TABLE['GETDELEGATIONTOKEN']) res = operate_requests('GET', '/', 'GETDELEGATIONTOKEN', ) check_success_json(res, 'Token') JSON.parse(res.body)['Token']['urlString'] end |
#gettrashroot(options = {}) ⇒ Object
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETTRASHROOT”
198 199 200 201 202 |
# File 'lib/webhdfs/client_v1.rb', line 198 def gettrashroot(={}) (, OPT_TABLE['GETTRASHROOT']) res = operate_requests('GET', '/', 'GETTRASHROOT', ) check_success_json(res, 'Path') end |
#homedir(options = {}) ⇒ Object Also known as: gethomedirectory
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”
190 191 192 193 194 |
# File 'lib/webhdfs/client_v1.rb', line 190 def homedir(={}) (, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', ) check_success_json(res, 'Path') end |
#list(path, options = {}) ⇒ Object Also known as: liststatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”
166 167 168 169 170 |
# File 'lib/webhdfs/client_v1.rb', line 166 def list(path, ={}) (, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', ) check_success_json(res, 'FileStatuses')['FileStatus'] end |
#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”
130 131 132 133 134 |
# File 'lib/webhdfs/client_v1.rb', line 130 def mkdir(path, ={}) (, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', ) check_success_json(res, 'boolean') end |
#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/webhdfs/client_v1.rb', line 307 def operate_requests(method, path, op, params={}, payload=nil) if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless res.is_a?(Net::HTTPRedirection) and res['location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'}) else if @httpfs_mode and not payload.nil? request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'}) else request(@host, @port, method, path, op, params, payload) end end end |
#read(path, options = {}) ⇒ Object Also known as: open
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
121 122 123 124 125 |
# File 'lib/webhdfs/client_v1.rb', line 121 def read(path, ={}) (, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', ) res.body end |
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
139 140 141 142 143 144 145 146 |
# File 'lib/webhdfs/client_v1.rb', line 139 def rename(path, dest, ={}) (, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
#renew_kerberos_delegation_token(token, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/?op=RENEWDELEGATIONTOKEN&token=<DELEGATION_TOKEN>”
263 264 265 266 267 268 |
# File 'lib/webhdfs/client_v1.rb', line 263 def renew_kerberos_delegation_token(token, ={}) = .merge({ 'token' => token }) (, OPT_TABLE['RENEWDELEGATIONTOKEN']) res = operate_requests('PUT', '/', 'RENEWDELEGATIONTOKEN', ) check_success_json(res, 'long') end |
#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
[&replication=<SHORT>]"
229 230 231 232 233 |
# File 'lib/webhdfs/client_v1.rb', line 229 def replication(path, replnum, ={}) (, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', .merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end |
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/webhdfs/client_v1.rb', line 336 def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0) conn = Net::HTTP.new(host, port, @proxy_address, @proxy_port) conn.proxy_user = @proxy_user if @proxy_user conn.proxy_pass = @proxy_pass if @proxy_pass conn.open_timeout = @open_timeout if @open_timeout conn.read_timeout = @read_timeout if @read_timeout path = Addressable::URI.escape(path) # make path safe for transmission via HTTP request_path = if op build_path(path, op, params) else path end if @ssl conn.use_ssl = true conn.ca_file = @ssl_ca_file if @ssl_ca_file if @ssl_verify_mode require 'openssl' conn.verify_mode = case @ssl_verify_mode when :none then OpenSSL::SSL::VERIFY_NONE when :peer then OpenSSL::SSL::VERIFY_PEER end end conn.cert = @ssl_cert if @ssl_cert conn.key = @ssl_key if @ssl_key conn.ssl_version = @ssl_version if @ssl_version end gsscli = nil if @kerberos require 'base64' require 'gssapi' gsscli = GSSAPI::Simple.new(@host, 'HTTP', @kerberos_keytab) token = nil begin token = gsscli.init_context rescue => e raise WebHDFS::KerberosError, e. end if header header['Authorization'] = "Negotiate #{Base64.strict_encode64(token)}" else header = {'Authorization' => "Negotiate #{Base64.strict_encode64(token)}"} end else header = {} if header.nil? header = @http_headers.merge(header) end res = nil if !payload.nil? and payload.respond_to? :read and payload.respond_to? :size req = Net::HTTPGenericRequest.new(method,(payload ? true : false),true,request_path,header) raise WebHDFS::ClientError, 'Error accepting given IO resource as data payload, Not valid in methods other than PUT and POST' unless (method == 'PUT' or method == 'POST') req.body_stream = payload req.content_length = payload.size begin res = conn.request(req) rescue => e raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}" end else begin res = conn.send_request(method, request_path, payload, header) rescue => e raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}" end end # if delegation token param settled, namenode do not response WWW-Authenticate header if @kerberos and res.code == '307' and not params.key?('delegation') itok = (res.header.get_fields('WWW-Authenticate') || ['']).pop.split(/\s+/).last unless itok raise WebHDFS::KerberosError, 'Server does not return WWW-Authenticate header' end begin gsscli.init_context(Base64.strict_decode64(itok)) rescue => e raise WebHDFS::KerberosError, e. end end case res when Net::HTTPSuccess res when Net::HTTPRedirection res else = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end # when delegation token is invalid if res.code == '403' and @renew_kerberos_delegation_token_time_hour && retries < @retry_times if .include?('{"RemoteException":{') detail = JSON.parse() rescue nil if detail&.dig('RemoteException', 'message')&.include?('HDFS_DELEGATION_TOKEN') params = params.merge('token' => get_cached_kerberos_delegation_token(true)) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end end if @retry_known_errors && retries < @retry_times detail = nil if =~ /^\{"RemoteException":\{/ begin detail = JSON.parse() rescue # ignore broken json response body end end if detail && detail['RemoteException'] && KNOWN_ERRORS.include?(detail['RemoteException']['exception']) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end case res.code when '400' raise WebHDFS::ClientError, when '401' raise WebHDFS::SecurityError, when '403' raise WebHDFS::IOError, when '404' raise WebHDFS::FileNotFoundError, when '500' raise WebHDFS::ServerError, else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}" end end end |
#should_kerberos_token_updated? ⇒ Boolean
69 70 71 |
# File 'lib/webhdfs/client_v1.rb', line 69 def should_kerberos_token_updated? @kerberos_token_updated_at + (@renew_kerberos_delegation_token_time_hour * 60 * 60) <= Time.now end |
#stat(path, options = {}) ⇒ Object Also known as: getfilestatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”
158 159 160 161 162 |
# File 'lib/webhdfs/client_v1.rb', line 158 def stat(path, ={}) (, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', ) check_success_json(res, 'FileStatus') end |
#touch(path, options = {}) ⇒ Object Also known as: settimes
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
[&modificationtime=<TIME>][&accesstime=<TIME>]"
motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer
240 241 242 243 244 245 246 247 248 |
# File 'lib/webhdfs/client_v1.rb', line 240 def touch(path, ={}) (, OPT_TABLE['SETTIMES']) unless .has_key?('modificationtime') or .has_key?('accesstime') or .has_key?(:modificationtime) or .has_key?(:accesstime) raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime" end res = operate_requests('PUT', path, 'SETTIMES', ) res.code == '200' end |