Class: WebHDFS::ClientV1

Inherits:
Object
  • Object
show all
Defined in:
lib/webhdfs/client_v1.rb

Direct Known Subclasses

Client

Constant Summary collapse

OPT_TABLE =

This hash table holds command options.

{}
REDIRECTED_OPERATIONS =
['APPEND', 'CREATE', 'OPEN', 'GETFILECHECKSUM']

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 50070, username = nil, doas = nil) ⇒ ClientV1

Returns a new instance of ClientV1.



16
17
18
19
20
21
# File 'lib/webhdfs/client_v1.rb', line 16

def initialize(host='localhost', port=50070, username=nil, doas=nil)
  @host = host
  @port = port
  @username = username
  @doas = doas
end

Instance Attribute Details

#doasObject

internal use only



13
14
15
# File 'lib/webhdfs/client_v1.rb', line 13

def doas
  @doas
end

#hostObject

internal use only



13
14
15
# File 'lib/webhdfs/client_v1.rb', line 13

def host
  @host
end

#open_timeoutObject

Returns the value of attribute open_timeout.



14
15
16
# File 'lib/webhdfs/client_v1.rb', line 14

def open_timeout
  @open_timeout
end

#portObject

internal use only



13
14
15
# File 'lib/webhdfs/client_v1.rb', line 13

def port
  @port
end

#read_timeoutObject

Returns the value of attribute read_timeout.



14
15
16
# File 'lib/webhdfs/client_v1.rb', line 14

def read_timeout
  @read_timeout
end

#usernameObject

internal use only



13
14
15
# File 'lib/webhdfs/client_v1.rb', line 13

def username
  @username
end

Instance Method Details

#api_path(path) ⇒ Object



184
185
186
187
188
189
190
# File 'lib/webhdfs/client_v1.rb', line 184

def api_path(path)
  if path.start_with?('/')
    '/webhdfs/v1' + path
  else
    '/webhdfs/v1/' + path
  end
end

#append(path, body, options = {}) ⇒ Object



34
35
36
37
38
# File 'lib/webhdfs/client_v1.rb', line 34

def append(path, body, options={})
  check_options(options, OPT_TABLE['APPEND'])
  res = operate_requests('POST', path, 'APPEND', options, body)
  res.code == '200'
end

#build_path(path, op, params) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/webhdfs/client_v1.rb', line 192

def build_path(path, op, params)
  opts = if @username and @doas
           {'op' => op, 'user.name' => @username, 'doas' => @doas}
         elsif @username
           {'op' => op, 'user.name' => @username}
         elsif @doas
           {'op' => op, 'doas' => @doas}
         else
           {'op' => op}
         end
  query = URI.encode_www_form(params.merge(opts))
  api_path(path) + '?' + query
end

#check_options(options, optdecl = []) ⇒ Object

def delegation_token(user, options={}) # GETDELEGATIONTOKEN

raise NotImplementedError

end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN

raise NotImplementedError

end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN

raise NotImplementedError

end

Raises:

  • (ArgumentError)


175
176
177
178
# File 'lib/webhdfs/client_v1.rb', line 175

def check_options(options, optdecl=[])
  ex = options.keys - (optdecl || [])
  raise ArgumentError, "no such option: #{ex.keys.join(' ')}" unless ex.empty?
end

#check_success_json(res, attr = nil) ⇒ Object



180
181
182
# File 'lib/webhdfs/client_v1.rb', line 180

def check_success_json(res, attr=nil)
  res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr])
end

#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”



104
105
106
107
108
# File 'lib/webhdfs/client_v1.rb', line 104

def checksum(path, options={})
  check_options(options, OPT_TABLE['GETFILECHECKSUM'])
  res = operate_requests('GET', path, 'GETFILECHECKSUM', options)
  check_success_json(res, 'FileChecksum')
end

#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION

[&permission=<OCTAL>]"


121
122
123
124
125
# File 'lib/webhdfs/client_v1.rb', line 121

def chmod(path, mode, options={})
  check_options(options, OPT_TABLE['SETPERMISSION'])
  res = operate_requests('PUT', path, 'SETPERMISSION', options.merge({'permission' => mode}))
  res.code == '200'
end

#chown(path, options = {}) ⇒ Object Also known as: setowner

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER

[&owner=<USER>][&group=<GROUP>]"


130
131
132
133
134
135
136
137
# File 'lib/webhdfs/client_v1.rb', line 130

def chown(path, options={})
  check_options(options, OPT_TABLE['SETOWNER'])
  unless options.has_key?('owner') or options.has_key?('group')
    raise ArgumentError, "'chown' needs at least one of owner or group"
  end
  res = operate_requests('PUT', path, 'SETOWNER', options)
  res.code == '200'
end

#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”



96
97
98
99
100
# File 'lib/webhdfs/client_v1.rb', line 96

def content_summary(path, options={})
  check_options(options, OPT_TABLE['GETCONTENTSUMMARY'])
  res = operate_requests('GET', path, 'GETCONTENTSUMMARY', options)
  check_success_json(res, 'ContentSummary')
end

#create(path, body, options = {}) ⇒ Object

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE

[&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"


26
27
28
29
30
# File 'lib/webhdfs/client_v1.rb', line 26

def create(path, body, options={})
  check_options(options, OPT_TABLE['CREATE'])
  res = operate_requests('PUT', path, 'CREATE', options, body)
  res.code == '201'
end

#delete(path, options = {}) ⇒ Object

curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE

[&recursive=<true|false>]"


72
73
74
75
76
# File 'lib/webhdfs/client_v1.rb', line 72

def delete(path, options={})
  check_options(options, OPT_TABLE['DELETE'])
  res = operate_requests('DELETE', path, 'DELETE', options)
  check_success_json(res, 'boolean')
end

#homedir(options = {}) ⇒ Object Also known as: gethomedirectory

curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”



112
113
114
115
116
# File 'lib/webhdfs/client_v1.rb', line 112

def homedir(options={})
  check_options(options, OPT_TABLE['GETHOMEDIRECTORY'])
  res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', options)
  check_success_json(res, 'Path')
end

#list(path, options = {}) ⇒ Object Also known as: liststatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”



88
89
90
91
92
# File 'lib/webhdfs/client_v1.rb', line 88

def list(path, options={})
  check_options(options, OPT_TABLE['LISTSTATUS'])
  res = operate_requests('GET', path, 'LISTSTATUS', options)
  check_success_json(res, 'FileStatuses')['FileStatus']
end

#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs

curl -i -X PUT “<HOST>:<PORT>/<PATH>?op=MKDIRS



52
53
54
55
56
# File 'lib/webhdfs/client_v1.rb', line 52

def mkdir(path, options={})
  check_options(options, OPT_TABLE['MKDIRS'])
  res = operate_requests('PUT', path, 'MKDIRS', options)
  check_success_json(res, 'boolean')
end

#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/webhdfs/client_v1.rb', line 207

def operate_requests(method, path, op, params={}, payload=nil)
  if REDIRECTED_OPERATIONS.include?(op)
    res = request(@host, @port, method, path, op, params, nil)
    unless res.is_a?(Net::HTTPRedirection) and res['location']
      msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}."
      raise WebHDFS::RequestFailedError, msg
    end
    uri = URI.parse(res['location'])
    rpath = if uri.query
              uri.path + '?' + uri.query
            else
              uri.path
            end
    request(uri.host, uri.port, method, rpath, nil, {}, payload)
  else
    request(@host, @port, method, path, op, params, nil)
  end
end

#read(path, options = {}) ⇒ Object Also known as: open

curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN

[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"


43
44
45
46
47
# File 'lib/webhdfs/client_v1.rb', line 43

def read(path, options={})
  check_options(options, OPT_TABLE['OPEN'])
  res = operate_requests('GET', path, 'OPEN', options)
  res.body
end

#rename(path, dest, options = {}) ⇒ Object

curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”



61
62
63
64
65
66
67
68
# File 'lib/webhdfs/client_v1.rb', line 61

def rename(path, dest, options={})
  check_options(options, OPT_TABLE['RENAME'])
  unless dest.start_with?('/')
    dest = '/' + dest
  end
  res = operate_requests('PUT', path, 'RENAME', options.merge({'destination' => dest}))
  check_success_json(res, 'boolean')
end

#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION

[&replication=<SHORT>]"


143
144
145
146
147
# File 'lib/webhdfs/client_v1.rb', line 143

def replication(path, replnum, options={})
  check_options(options, OPT_TABLE['SETREPLICATION'])
  res = operate_requests('PUT', path, 'SETREPLICATION', options.merge({'replication' => replnum.to_s}))
  check_success_json(res, 'boolean')
end

#request(host, port, method, path, op = nil, params = {}, payload = nil) ⇒ Object

IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/webhdfs/client_v1.rb', line 232

def request(host, port, method, path, op=nil, params={}, payload=nil)
  conn = Net::HTTP.start(host, port)
  conn.open_timeout = @open_timeout if @open_timeout
  conn.read_timeout = @read_timeout if @read_timeout

  request_path = if op
                   build_path(path, op, params)
                 else
                   path
                 end

  p({:host => host, :port => port, :method => method, :path => request_path})
  res = conn.send_request(method, request_path, payload)

  case res
  when Net::HTTPSuccess
    res
  when Net::HTTPRedirection
    res
  else
    message = if res.body and not res.body.empty?
                res.body.gsub(/\n/, '')
              else
                'Response body is empty...'
              end
    case res.code
    when '400'
      raise WebHDFS::ClientError, message
    when '401'
      raise WebHDFS::SecurityError, message
    when '403'
      raise WebHDFS::IOError, message
    when '404'
      raise WebHDFS::FileNotFoundError, message
    when '500'
      raise WebHDFS::ServerError, message
    else
      raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}"
    end
  end
end

#stat(path, options = {}) ⇒ Object Also known as: getfilestatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”



80
81
82
83
84
# File 'lib/webhdfs/client_v1.rb', line 80

def stat(path, options={})
  check_options(options, OPT_TABLE['GETFILESTATUS'])
  res = operate_requests('GET', path, 'GETFILESTATUS', options)
  check_success_json(res, 'FileStatus')
end

#touch(path, options = {}) ⇒ Object Also known as: settimes

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES

[&modificationtime=<TIME>][&accesstime=<TIME>]"

motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer



154
155
156
157
158
159
160
161
# File 'lib/webhdfs/client_v1.rb', line 154

def touch(path, options={})
  check_options(options, OPT_TABLE['SETTIMES'])
  unless options.has_key?('modificationtime') or options.has_key?('accesstime')
    raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime"
  end
  res = operate_requests('PUT', path, 'SETTIMES', options)
  res.code == '200'
end