Class: WebHDFS::ClientV1

Inherits:
Object
  • Object
show all
Defined in:
lib/webhdfs/client_v1.rb

Direct Known Subclasses

Client

Constant Summary collapse

OPT_TABLE =

This hash table holds command options.

{}
REDIRECTED_OPERATIONS =
['APPEND' ,'CREATE' , 'OPEN', 'GETFILECHECKSUM']
REDIRECTED_CODE =
(300..399)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 14000, username = nil, doas = nil) ⇒ ClientV1

Returns a new instance of ClientV1.



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/webhdfs/client_v1.rb', line 24

def initialize(host='localhost', port=14000, username=nil, doas=nil)
  @host = host
  @port = port
  @username = username
  @doas = doas

  @httpfs_mode = false

  @auth_type = :pseudo
  @keytab = nil
  @pass = nil
end

Instance Attribute Details

#auth_typeObject

pseudo, kerberos



19
20
21
# File 'lib/webhdfs/client_v1.rb', line 19

def auth_type
  @auth_type
end

#doasObject

internal use only



15
16
17
# File 'lib/webhdfs/client_v1.rb', line 15

def doas
  @doas
end

#hostObject

internal use only



15
16
17
# File 'lib/webhdfs/client_v1.rb', line 15

def host
  @host
end

#httpfs_modeObject

Returns the value of attribute httpfs_mode.



18
19
20
# File 'lib/webhdfs/client_v1.rb', line 18

def httpfs_mode
  @httpfs_mode
end

#keytabObject

Returns the value of attribute keytab.



20
21
22
# File 'lib/webhdfs/client_v1.rb', line 20

def keytab
  @keytab
end

#open_timeoutObject

default 30s (in ruby net/http)



16
17
18
# File 'lib/webhdfs/client_v1.rb', line 16

def open_timeout
  @open_timeout
end

#passObject

Returns the value of attribute pass.



21
22
23
# File 'lib/webhdfs/client_v1.rb', line 21

def pass
  @pass
end

#portObject

internal use only



15
16
17
# File 'lib/webhdfs/client_v1.rb', line 15

def port
  @port
end

#read_timeoutObject

default 60s (in ruby net/http)



17
18
19
# File 'lib/webhdfs/client_v1.rb', line 17

def read_timeout
  @read_timeout
end

#usernameObject

internal use only



15
16
17
# File 'lib/webhdfs/client_v1.rb', line 15

def username
  @username
end

Instance Method Details

#api_path(path) ⇒ Object



218
219
220
221
222
223
224
# File 'lib/webhdfs/client_v1.rb', line 218

def api_path(path)
  if path.start_with?('/')
    '/webhdfs/v1' + path
  else
    '/webhdfs/v1/' + path
  end
end

#append(path, body, options = {}) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/webhdfs/client_v1.rb', line 63

def append(path, body, options={})
  if @httpfs_mode
    options = options.merge({'data' => 'true'})
  end
  check_options(options, OPT_TABLE['APPEND'])
  res = operate_requests(:post, path, 'APPEND', options, body)
  res.code == 200
end

#build_path(path, op, params) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/webhdfs/client_v1.rb', line 226

def build_path(path, op, params)
  opts = if @username and @doas
           {'op' => op, 'user.name' => @username, 'doas' => @doas}
         elsif @username
           {'op' => op, 'user.name' => @username}
         elsif @doas
           {'op' => op, 'doas' => @doas}
         else
           {'op' => op}
         end
  query = URI.encode_www_form(params.merge(opts))
  api_path(path) + '?' + query
end

#check_options(options, optdecl = []) ⇒ Object

def delegation_token(user, options={}) # GETDELEGATIONTOKEN

raise NotImplementedError

end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN

raise NotImplementedError

end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN

raise NotImplementedError

end

Raises:

  • (ArgumentError)


209
210
211
212
# File 'lib/webhdfs/client_v1.rb', line 209

def check_options(options, optdecl=[])
  ex = options.keys.map(&:to_s) - (optdecl || [])
  raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty?
end

#check_success_json(res, attr = nil) ⇒ Object



214
215
216
# File 'lib/webhdfs/client_v1.rb', line 214

def check_success_json(res, attr=nil)
  res.code == 200 and res.headers['Content-Type'].include?('application/json') and (attr.nil? or JSON.parse(res.body)[attr])
end

#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”



136
137
138
139
140
# File 'lib/webhdfs/client_v1.rb', line 136

def checksum(path, options={})
  check_options(options, OPT_TABLE['GETFILECHECKSUM'])
  res = operate_requests(:get, path, 'GETFILECHECKSUM', options)
  check_success_json(res, 'FileChecksum')
end

#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION

[&permission=<OCTAL>]"


153
154
155
156
157
# File 'lib/webhdfs/client_v1.rb', line 153

def chmod(path, mode, options={})
  check_options(options, OPT_TABLE['SETPERMISSION'])
  res = operate_requests(:put, path, 'SETPERMISSION', options.merge({'permission' => mode}))
  res.code == 200
end

#chown(path, options = {}) ⇒ Object Also known as: setowner

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER

[&owner=<USER>][&group=<GROUP>]"


162
163
164
165
166
167
168
169
170
# File 'lib/webhdfs/client_v1.rb', line 162

def chown(path, options={})
  check_options(options, OPT_TABLE['SETOWNER'])
  unless options.has_key?('owner') or options.has_key?('group') or
      options.has_key?(:owner) or options.has_key?(:group)
    raise ArgumentError, "'chown' needs at least one of owner or group"
  end
  res = operate_requests(:put, path, 'SETOWNER', options)
  res.code == 200
end

#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”



128
129
130
131
132
# File 'lib/webhdfs/client_v1.rb', line 128

def content_summary(path, options={})
  check_options(options, OPT_TABLE['GETCONTENTSUMMARY'])
  res = operate_requests(:get, path, 'GETCONTENTSUMMARY', options)
  check_success_json(res, 'ContentSummary')
end

#create(path, body, options = {}) ⇒ Object

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE

[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"


52
53
54
55
56
57
58
59
# File 'lib/webhdfs/client_v1.rb', line 52

def create(path, body, options={})
  if @httpfs_mode
    options = options.merge({'data' => 'true'})
  end
  check_options(options, OPT_TABLE['CREATE'])
  res = operate_requests(:put, path, 'CREATE', options, body)
  res.code == 201
end

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATESYMLINK&destination=<PATH>

[&createParent=<true|false>]"


39
40
41
42
43
44
45
46
# File 'lib/webhdfs/client_v1.rb', line 39

def create_symlink(path, dest, options={})
  check_options(options, OPT_TABLE['CREATESYMLINK'])
  unless dest.start_with?('/')
    dest = '/' + dest
  end
  res = operate_requests(:put, path, 'CREATESYMLINK', options.merge({'destination' => dest}))
  check_success_json(res, 'boolean')
end

#delete(path, options = {}) ⇒ Object

curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE

[&recursive=<true|false>]"


104
105
106
107
108
# File 'lib/webhdfs/client_v1.rb', line 104

def delete(path, options={})
  check_options(options, OPT_TABLE['DELETE'])
  res = operate_requests(:delete, path, 'DELETE', options)
  check_success_json(res, 'boolean')
end

#homedir(options = {}) ⇒ Object Also known as: gethomedirectory

curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”



144
145
146
147
148
# File 'lib/webhdfs/client_v1.rb', line 144

def homedir(options={})
  check_options(options, OPT_TABLE['GETHOMEDIRECTORY'])
  res = operate_requests(:get, '/', 'GETHOMEDIRECTORY', options)
  check_success_json(res, 'Path')
end

#list(path, options = {}) ⇒ Object Also known as: liststatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”



120
121
122
123
124
# File 'lib/webhdfs/client_v1.rb', line 120

def list(path, options={})
  check_options(options, OPT_TABLE['LISTSTATUS'])
  res = operate_requests(:get, path, 'LISTSTATUS', options)
  check_success_json(res, 'FileStatuses')['FileStatus']
end

#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs

curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS



84
85
86
87
88
# File 'lib/webhdfs/client_v1.rb', line 84

def mkdir(path, options={})
  check_options(options, OPT_TABLE['MKDIRS'])
  res = operate_requests(:put, path, 'MKDIRS', options)
  check_success_json(res, 'boolean')
end

#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/webhdfs/client_v1.rb', line 243

def operate_requests(method, path, op, params={}, payload=nil)
  if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op)
    res = request(@host, @port, method, path, op, params, nil)
    unless REDIRECTED_CODE.include?(res.code) and res.headers['Location']
      msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}."
      raise WebHDFS::RequestFailedError, msg
    end
    uri = URI.parse(res.headers['Location'])
    rpath = if uri.query
              uri.path + '?' + uri.query
            else
              uri.path
            end
    request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'})
  else
    if @httpfs_mode and not payload.nil?
      request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'})
    else
      request(@host, @port, method, path, op, params, payload)
    end
  end
end

#read(path, options = {}) ⇒ Object Also known as: open

curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN

[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"


75
76
77
78
79
# File 'lib/webhdfs/client_v1.rb', line 75

def read(path, options={})
  check_options(options, OPT_TABLE['OPEN'])
  res = operate_requests(:get, path, 'OPEN', options)
  res.body
end

#rename(path, dest, options = {}) ⇒ Object

curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”



93
94
95
96
97
98
99
100
# File 'lib/webhdfs/client_v1.rb', line 93

def rename(path, dest, options={})
  check_options(options, OPT_TABLE['RENAME'])
  unless dest.start_with?('/')
    dest = '/' + dest
  end
  res = operate_requests(:put, path, 'RENAME', options.merge({'destination' => dest}))
  check_success_json(res, 'boolean')
end

#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION

[&replication=<SHORT>]"


176
177
178
179
180
# File 'lib/webhdfs/client_v1.rb', line 176

def replication(path, replnum, options={})
  check_options(options, OPT_TABLE['SETREPLICATION'])
  res = operate_requests(:put, path, 'SETREPLICATION', options.merge({'replication' => replnum.to_s}))
  check_success_json(res, 'boolean')
end

#request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil) ⇒ Object

IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error

Parameters:

  • host (Object)
  • port (Object)
  • method (Object)
  • path (Object)
  • op (Object) (defaults to: nil)
  • params (Object) (defaults to: {})
  • payload (Object) (defaults to: nil)
  • header (Object) (defaults to: nil)


280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/webhdfs/client_v1.rb', line 280

def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil)
  req = HTTPI::Request.new
  krb5 = nil

  HTTPI.log     = true      # disable logging
  HTTPI.log_level= :debug
  HTTPI.adapter = :net_http #  one of [:httpclient, :curb, :net_http]

  if @auth_type == :kerberos
    if @username
      krb5 = Krb5Auth::Krb5.new
      inited = false;
      begin
        if @keytab
          inited = krb5.get_init_creds_keytab(@username, @keytab)
        elsif @pass
          inited = krb5.get_init_creds_password(@username, @pass)
        else
          raise ArgumentError, "kerberos authentication requires keytab or password"
        end
      rescue
        raise WebHDFS::SecurityError, "kerberos initialization is failed"
      end
      if inited
        krb5.cache
        HTTPI.adapter = :curb
        req.auth.gssnegotiate
      end
    end
  end
  req.open_timeout = @open_timeout if @open_timeout
  req.read_timeout = @read_timeout if @read_timeout
  request_path = if op
                   build_path(path, op, params)
                 else
                   path
                 end
  req.url = URI::HTTP.build({:host => host, :port => port}) + request_path
  req.headers = header.nil? ? {} : header        #  MUST BE ASSIGN {} if nil BY zixian.shen
  req.body =  payload.nil? ? {} : payload        #  MUST BE ASSIGN {} if nil BY zixian.shen

  res = HTTPI.request( method, req )
  if HTTPI::Response::SuccessfulResponseCodes.include?(res.code)
    res
  elsif REDIRECTED_CODE.include?(res.code)
    res
  else
    message = if res.body and not res.body.empty?
                res.body.gsub(/\n/, '')
              else
                'Response body is empty...'
              end
    case res.code
    when 400
      raise WebHDFS::ClientError, message
    when 401
      raise WebHDFS::SecurityError, message
    when 403
      raise WebHDFS::IOError, message
    when 404
      raise WebHDFS::FileNotFoundError, message
    when 500
      raise WebHDFS::ServerError, message
    else
      raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}"
    end
  end
end

#stat(path, options = {}) ⇒ Object Also known as: getfilestatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”



112
113
114
115
116
# File 'lib/webhdfs/client_v1.rb', line 112

def stat(path, options={})
  check_options(options, OPT_TABLE['GETFILESTATUS'])
  res = operate_requests(:get, path, 'GETFILESTATUS', options)
  check_success_json(res, 'FileStatus')
end

#touch(path, options = {}) ⇒ Object Also known as: settimes

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES

[&modificationtime=<TIME>][&accesstime=<TIME>]"

motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer



187
188
189
190
191
192
193
194
195
# File 'lib/webhdfs/client_v1.rb', line 187

def touch(path, options={})
  check_options(options, OPT_TABLE['SETTIMES'])
  unless options.has_key?('modificationtime') or options.has_key?('accesstime') or
      options.has_key?(:modificationtime) or options.has_key?(:accesstime)
    raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime"
  end
  res = operate_requests(:put, path, 'SETTIMES', options)
  res.code == 200
end