Class: WebHDFS::ClientV1
- Inherits:
-
Object
- Object
- WebHDFS::ClientV1
- Defined in:
- lib/webhdfs/client_v1.rb
Direct Known Subclasses
Constant Summary collapse
- OPT_TABLE =
This hash table holds command options.
{}
- KNOWN_ERRORS =
internal use only
['LeaseExpiredException'].freeze
- REDIRECTED_OPERATIONS =
['APPEND', 'CREATE', 'OPEN', 'GETFILECHECKSUM']
Instance Attribute Summary collapse
-
#doas ⇒ Object
Returns the value of attribute doas.
-
#host ⇒ Object
Returns the value of attribute host.
-
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
-
#open_timeout ⇒ Object
default 30s (in ruby net/http).
-
#port ⇒ Object
Returns the value of attribute port.
-
#proxy_address ⇒ Object
Returns the value of attribute proxy_address.
-
#proxy_pass ⇒ Object
Returns the value of attribute proxy_pass.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
-
#proxy_user ⇒ Object
Returns the value of attribute proxy_user.
-
#read_timeout ⇒ Object
default 60s (in ruby net/http).
-
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false).
-
#retry_known_errors ⇒ Object
default false (not to retry).
-
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false).
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #api_path(path) ⇒ Object
-
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”.
- #build_path(path, op, params) ⇒ Object
-
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN raise NotImplementedError end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN raise NotImplementedError end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN raise NotImplementedError end.
- #check_success_json(res, attr = nil) ⇒ Object
-
#checksum(path, options = {}) ⇒ Object
(also: #getfilechecksum)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”.
-
#chmod(path, mode, options = {}) ⇒ Object
(also: #setpermission)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION [&permission=<OCTAL>]”.
-
#chown(path, options = {}) ⇒ Object
(also: #setowner)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER [&owner=<USER>]”.
-
#content_summary(path, options = {}) ⇒ Object
(also: #getcontentsummary)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”.
-
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&replication=<SHORT>] [&permission=<OCTAL>]”.
-
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]”.
-
#homedir(options = {}) ⇒ Object
(also: #gethomedirectory)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil) ⇒ ClientV1
constructor
A new instance of ClientV1.
-
#list(path, options = {}) ⇒ Object
(also: #liststatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”.
-
#mkdir(path, options = {}) ⇒ Object
(also: #mkdirs)
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”.
- #operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
-
#read(path, options = {}) ⇒ Object
(also: #open)
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&buffersize=<INT>]”.
-
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”.
-
#replication(path, replnum, options = {}) ⇒ Object
(also: #setreplication)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION [&replication=<SHORT>]”.
-
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error.
-
#stat(path, options = {}) ⇒ Object
(also: #getfilestatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”.
-
#touch(path, options = {}) ⇒ Object
(also: #settimes)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES [&modificationtime=<TIME>]” motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer.
Constructor Details
#initialize(host = 'localhost', port = 50070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil) ⇒ ClientV1
Returns a new instance of ClientV1.
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/webhdfs/client_v1.rb', line 23 def initialize(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil) @host = host @port = port @username = username @doas = doas @proxy_address = proxy_address @proxy_port = proxy_port @retry_known_errors = false @retry_times = 1 @retry_interval = 1 @httpfs_mode = false end |
Instance Attribute Details
#doas ⇒ Object
Returns the value of attribute doas.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def doas @doas end |
#host ⇒ Object
Returns the value of attribute host.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def host @host end |
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
18 19 20 |
# File 'lib/webhdfs/client_v1.rb', line 18 def httpfs_mode @httpfs_mode end |
#open_timeout ⇒ Object
default 30s (in ruby net/http)
16 17 18 |
# File 'lib/webhdfs/client_v1.rb', line 16 def open_timeout @open_timeout end |
#port ⇒ Object
Returns the value of attribute port.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def port @port end |
#proxy_address ⇒ Object
Returns the value of attribute proxy_address.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def proxy_address @proxy_address end |
#proxy_pass ⇒ Object
Returns the value of attribute proxy_pass.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def proxy_pass @proxy_pass end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def proxy_port @proxy_port end |
#proxy_user ⇒ Object
Returns the value of attribute proxy_user.
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def proxy_user @proxy_user end |
#read_timeout ⇒ Object
default 60s (in ruby net/http)
17 18 19 |
# File 'lib/webhdfs/client_v1.rb', line 17 def read_timeout @read_timeout end |
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false)
21 22 23 |
# File 'lib/webhdfs/client_v1.rb', line 21 def retry_interval @retry_interval end |
#retry_known_errors ⇒ Object
default false (not to retry)
19 20 21 |
# File 'lib/webhdfs/client_v1.rb', line 19 def retry_known_errors @retry_known_errors end |
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false)
20 21 22 |
# File 'lib/webhdfs/client_v1.rb', line 20 def retry_times @retry_times end |
#username ⇒ Object
Returns the value of attribute username.
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def username @username end |
Instance Method Details
#api_path(path) ⇒ Object
206 207 208 209 210 211 212 |
# File 'lib/webhdfs/client_v1.rb', line 206 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end |
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND”
51 52 53 54 55 56 57 58 |
# File 'lib/webhdfs/client_v1.rb', line 51 def append(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', , body) res.code == '200' end |
#build_path(path, op, params) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/webhdfs/client_v1.rb', line 214 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end |
#check_options(options, optdecl = []) ⇒ Object
def delegation_token(user, options={}) # GETDELEGATIONTOKEN
raise NotImplementedError
end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN
raise NotImplementedError
end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
197 198 199 200 |
# File 'lib/webhdfs/client_v1.rb', line 197 def (, optdecl=[]) ex = .keys.map(&:to_s) - (optdecl || []) raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty? end |
#check_success_json(res, attr = nil) ⇒ Object
202 203 204 |
# File 'lib/webhdfs/client_v1.rb', line 202 def check_success_json(res, attr=nil) res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr]) end |
#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”
124 125 126 127 128 |
# File 'lib/webhdfs/client_v1.rb', line 124 def checksum(path, ={}) (, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', ) check_success_json(res, 'FileChecksum') end |
#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
[&permission=<OCTAL>]"
141 142 143 144 145 |
# File 'lib/webhdfs/client_v1.rb', line 141 def chmod(path, mode, ={}) (, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', .merge({'permission' => mode})) res.code == '200' end |
#chown(path, options = {}) ⇒ Object Also known as: setowner
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
[&owner=<USER>][&group=<GROUP>]"
150 151 152 153 154 155 156 157 158 |
# File 'lib/webhdfs/client_v1.rb', line 150 def chown(path, ={}) (, OPT_TABLE['SETOWNER']) unless .has_key?('owner') or .has_key?('group') or .has_key?(:owner) or .has_key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', ) res.code == '200' end |
#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”
116 117 118 119 120 |
# File 'lib/webhdfs/client_v1.rb', line 116 def content_summary(path, ={}) (, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', ) check_success_json(res, 'ContentSummary') end |
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"
40 41 42 43 44 45 46 47 |
# File 'lib/webhdfs/client_v1.rb', line 40 def create(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', , body) res.code == '201' end |
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
92 93 94 95 96 |
# File 'lib/webhdfs/client_v1.rb', line 92 def delete(path, ={}) (, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', ) check_success_json(res, 'boolean') end |
#homedir(options = {}) ⇒ Object Also known as: gethomedirectory
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”
132 133 134 135 136 |
# File 'lib/webhdfs/client_v1.rb', line 132 def homedir(={}) (, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', ) check_success_json(res, 'Path') end |
#list(path, options = {}) ⇒ Object Also known as: liststatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”
108 109 110 111 112 |
# File 'lib/webhdfs/client_v1.rb', line 108 def list(path, ={}) (, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', ) check_success_json(res, 'FileStatuses')['FileStatus'] end |
#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs
curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS”
72 73 74 75 76 |
# File 'lib/webhdfs/client_v1.rb', line 72 def mkdir(path, ={}) (, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', ) check_success_json(res, 'boolean') end |
#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/webhdfs/client_v1.rb', line 229 def operate_requests(method, path, op, params={}, payload=nil) if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless res.is_a?(Net::HTTPRedirection) and res['location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'}) else if @httpfs_mode and not payload.nil? request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'}) else request(@host, @port, method, path, op, params, payload) end end end |
#read(path, options = {}) ⇒ Object Also known as: open
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
63 64 65 66 67 |
# File 'lib/webhdfs/client_v1.rb', line 63 def read(path, ={}) (, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', ) res.body end |
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
81 82 83 84 85 86 87 88 |
# File 'lib/webhdfs/client_v1.rb', line 81 def rename(path, dest, ={}) (, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
[&replication=<SHORT>]"
164 165 166 167 168 |
# File 'lib/webhdfs/client_v1.rb', line 164 def replication(path, replnum, ={}) (, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', .merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end |
#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0) ⇒ Object
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/webhdfs/client_v1.rb', line 258 def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0) conn = Net::HTTP.new(host, port, @proxy_address, @proxy_port) conn.proxy_user = @proxy_user if @proxy_user conn.proxy_pass = @proxy_pass if @proxy_pass conn.open_timeout = @open_timeout if @open_timeout conn.read_timeout = @read_timeout if @read_timeout request_path = if op build_path(path, op, params) else path end res = nil if !payload.nil? and payload.is_a?(IO) req = Net::HTTPGenericRequest.new(method,(payload ? true : false),true,request_path,header) raise WebHDFS::IOError, 'Error reading given IO data source' unless payload.respond_to? :read and payload.respond_to? :size raise WebHDFS::ClientError, 'Error accepting given IO resource as data payload, Not valid in methods other than PUT and POST' unless (method == 'PUT' or method == 'POST') req.body_stream = payload req.content_length = payload.size res = conn.request(req) else res = conn.send_request(method, request_path, payload, header) end case res when Net::HTTPSuccess res when Net::HTTPRedirection res else = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end if @retry_known_errors && retries < @retry_times detail = nil if =~ /^\{"RemoteException":\{/ begin detail = JSON.parse() rescue # ignore broken json response body end end if detail && detail['RemoteException'] && KNOWN_ERRORS.include?(detail['RemoteException']['exception']) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end case res.code when '400' raise WebHDFS::ClientError, when '401' raise WebHDFS::SecurityError, when '403' raise WebHDFS::IOError, when '404' raise WebHDFS::FileNotFoundError, when '500' raise WebHDFS::ServerError, else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{}" end end end |
#stat(path, options = {}) ⇒ Object Also known as: getfilestatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”
100 101 102 103 104 |
# File 'lib/webhdfs/client_v1.rb', line 100 def stat(path, ={}) (, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', ) check_success_json(res, 'FileStatus') end |
#touch(path, options = {}) ⇒ Object Also known as: settimes
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
[&modificationtime=<TIME>][&accesstime=<TIME>]"
motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer
175 176 177 178 179 180 181 182 183 |
# File 'lib/webhdfs/client_v1.rb', line 175 def touch(path, ={}) (, OPT_TABLE['SETTIMES']) unless .has_key?('modificationtime') or .has_key?('accesstime') or .has_key?(:modificationtime) or .has_key?(:accesstime) raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime" end res = operate_requests('PUT', path, 'SETTIMES', ) res.code == '200' end |