Class: Visor::Image::Store::HDFS

Inherits:
Object
  • Object
show all
Includes:
Common::Exception
Defined in:
lib/image/store/hdfs.rb

Overview

The Apache Hadoop HDFS (HDFS) backend store.

This class handles the management of image files located in the HDFS storage system, based on a URI like *hdfs://username@s<host>:<port>/<path>/<bucket>/<image>*.

10.0.3.12:50075/webhdfs/v1/foo/1.iso?op=OPEN&user.name=hadoop&offset=0

Constant Summary collapse

CONTEXT_ROOT =
"webhdfs/v1"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, config) ⇒ Object

Initializes a new Cumulus store client object. Cumulus credentials are loaded from the URI, on GET and DELETE operations, or from the configuration file for POST and PUT operation.

Parameters:

  • uri (String)

    The URI of the file location.

  • config (Hash)

    A set of configurations for the wanted store, loaded from VISoR configuration file.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/image/store/hdfs.rb', line 32

def initialize(uri, config)
  @uri    = URI(uri)
  @config = config[:hdfs]

  if @uri.scheme
    @username = @uri.user
    @base     = @uri.path.split('/')[1..2].join('/')
    @bucket   = @uri.path.split('/')[3]
    @file     = @uri.path.split('/')[4]
    @host     = @uri.host
    @port     = @uri.port
  else
    @username = @config[:username]
    @bucket   = @config[:bucket]
    @base     = CONTEXT_ROOT
    @host     = @config[:host]
    @port     = @config[:port]
  end
end

Instance Attribute Details

#baseObject

Returns the value of attribute base.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def base
  @base
end

#bucketObject

Returns the value of attribute bucket.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def bucket
  @bucket
end

#configObject

Returns the value of attribute config.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def config
  @config
end

#fileObject

Returns the value of attribute file.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def file
  @file
end

#hostObject

Returns the value of attribute host.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def host
  @host
end

#portObject

Returns the value of attribute port.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def port
  @port
end

#uriObject

Returns the value of attribute uri.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def uri
  @uri
end

#usernameObject

Returns the value of attribute username.



21
22
23
# File 'lib/image/store/hdfs.rb', line 21

def username
  @username
end

Instance Method Details

#deleteObject

Deletes the image file from its location.

Raises:

  • (NotFound)

    If the image file was not found.



118
119
120
121
# File 'lib/image/store/hdfs.rb', line 118

def delete
  uri = generate_uri('op=DELETE&recursive=true')
  EventMachine::HttpRequest.new(uri).delete
end

#file_exists?(raise_exc = true) ⇒ True, False

Check if the image file exists.

Parameters:

  • raise_exc (True, False) (defaults to: true)

    (true) If it should raise exception or return true/false whether the file exists or not.

Returns:

  • (True, False)

    If raise_exc is false, return true/false whether the file exists or not.

Raises:

  • (NotFound)

    If the image file was not found.



133
134
135
136
137
138
139
140
# File 'lib/image/store/hdfs.rb', line 133

def file_exists?(raise_exc=true)
  uri   = generate_uri('op=GETFILESTATUS')
  req   = Net::HTTP::Get.new(uri.request_uri)
  res   = Net::HTTP.new(uri.hostname, uri.port).request(req)
  exist = res.is_a? Net::HTTPSuccess
  raise NotFound, "No image file found at #{uri}" if raise_exc && !exist
  exist
end

#generate_uri(params) ⇒ Object



142
143
144
# File 'lib/image/store/hdfs.rb', line 142

def generate_uri(params)
  URI("http://#{host}:#{port}/#{base}/#{bucket}/#{file}?#{params}&user.name=#{username}")
end

#getObject

Returns the image file to clients, streamed in chunks.

Returns:

  • (Object)

    Yields the file, a chunk at time.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/image/store/hdfs.rb', line 57

def get
  uri      = generate_uri('op=OPEN')
  # This raises cant yield from root fiber
  #res = EventMachine::HttpRequest.new(uri).get
  #url = URI(res.response_header['LOCATION'])
  #url.hostname = host
  #http   = EventMachine::HttpRequest.new(url).aget
  # ...

  # This works, should substitute (uri).get with (url).get in down
  #require "net/http"
  #req          = Net::HTTP::Get.new(uri.request_uri)
  #res          = Net::HTTP.new(uri.hostname, uri.port).request(req)
  #url          = URI(res['location'])
  #url.hostname = host
  #STDERR.puts "URL #{url}"
  # ...

  # This solves it by manually defining the final location
  uri.port = 50075
  uri      = uri.to_s + '&offset=0'

  http   = EventMachine::HttpRequest.new(uri).aget
  finish = proc { yield nil }
  http.stream { |chunk| yield chunk }
  http.callback &finish
  http.errback &finish
end

#save(id, tmp_file, format) ⇒ String, Integer

Saves the image file to the its final destination, based on the temporary file created by the server at data reception time.

Parameters:

  • id (String)

    The image id.

  • tmp_file (File)

    The temporary file descriptor.

  • format (String)

    The image file format.

Returns:

  • (String, Integer)

    The generated file location URI and image file size.

Raises:

  • (Duplicated)

    If the image file already exists.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/image/store/hdfs.rb', line 97

def save(id, tmp_file, format)
  @file = "#{id}.#{format}"
  uri   = "hdfs://#{username}@#{host}:#{port}/#{base}/#{bucket}/#{file}"
  size  = tmp_file.size

  path     = generate_uri('op=CREATE&overwrite=true')
  http     = EventMachine::HttpRequest.new(path).put
  location = URI(http.response_header['LOCATION'])

  location.hostname = host
  raise Duplicated, "The image file #{uri} already exists" if file_exists?(false)
  STDERR.puts "COPYING!!"

  EventMachine::HttpRequest.new(location).put :file => tmp_file.path
  [uri, size]
end