Module: IMW::Resources::Schemes::HDFS

Defined in:
lib/imw/resources/schemes/hdfs.rb

Overview

Defines methods for reading and writing data to/from an HDFS]

Learn more about Hadoop and the Hadoop Distributed Filesystem.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(obj) ⇒ Object

Checks to see if this is a file or directory



14
15
16
# File 'lib/imw/resources/schemes/hdfs.rb', line 14

def self.extended obj
  obj.extend(obj.is_directory? ? HDFSDirectory : HDFSFile)
end

.fs(command, *args) {|String| ... } ⇒ String

Execute command with args on the Hadoop Distributed Filesystem (HDFS).

If passed a block, yield each line of the output from the command, else just return the output.

Try running ‘hadoop fs -help’ for more information.

Parameters:

Yields:

  • (String)

    each line of the command’s output

Returns:

  • (String)

    the command’s output



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/imw/resources/schemes/hdfs.rb', line 144

def self.fs command, *args
  command_string = "#{executable} fs -#{command} #{args.compact.map(&:to_str).join(' ')}"
  command_string += " 2>&1" if command == :count # FIXME or else it just spams the screen when we do HDFS#refresh!
  output = `#{command_string}`.chomp
  if block_given?
    output.split("\n").each do |line|
      yield line
    end
  else
    output
  end
end

Instance Method Details

#cp(new_uri) ⇒ IMW::Resource

Copy this resource to the new_uri.

Parameters:

Returns:



30
31
32
# File 'lib/imw/resources/schemes/hdfs.rb', line 30

def cp new_uri
  IMW::Transforms::Transferer.new(:cp, self, new_uri).transfer!
end

#exist?true, false Also known as: exists?

Does this path exist on the HDFS?

Returns:

  • (true, false)


59
60
61
62
63
# File 'lib/imw/resources/schemes/hdfs.rb', line 59

def exist?
  return @exist unless @exist.nil?
  refresh!
  @exist
end

#is_directory?true, false

Is this resource an HDFS directory?

Returns:

  • (true, false)


111
112
113
# File 'lib/imw/resources/schemes/hdfs.rb', line 111

def is_directory?
  exist? && num_dirs > 0
end

#mv(new_uri) ⇒ IMW::Resource

Move this resource to the new_uri.

Parameters:

Returns:



38
39
40
# File 'lib/imw/resources/schemes/hdfs.rb', line 38

def mv new_uri
  IMW::Transforms::Transferer.new(:mv, self, new_uri).transfer!
end

#num_dirsFixnum

Return the number of directories contained at or below this path on the HDFS.

This value is cached. Call refresh to refresh the cache manually.

Returns:

  • (Fixnum)


87
88
89
90
91
92
# File 'lib/imw/resources/schemes/hdfs.rb', line 87

def num_dirs
  return @num_dirs unless @num_dirs.nil?
  refresh!
  should_exist!("Cannot report number of directories.")
  @num_dirs
end

#num_filesFixnum

Return the number of files contained at or below this path on the HDFS.

This value is cached. Call refresh to refresh the cache manually.

Returns:

  • (Fixnum)


101
102
103
104
105
106
# File 'lib/imw/resources/schemes/hdfs.rb', line 101

def num_files
  return @num_files unless @num_files.nil?
  refresh!
  should_exist!("Cannot report number of files.")
  @num_files
end

#on_hdfs?true, false Also known as: is_hdfs?

Is this resource an HDFS resource?

Returns:

  • (true, false)


21
22
23
# File 'lib/imw/resources/schemes/hdfs.rb', line 21

def on_hdfs?
  true
end

#refresh!IMW::Resource

Refresh the cached file properties.

Returns:



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/imw/resources/schemes/hdfs.rb', line 118

def refresh!
  response = HDFS.fs(:count, path)
  if response.blank? || response =~ /^Can not find listing for/
    @exist = false
    @num_dirs, @num_files, @size, @hdfs_path = false, false, false, false
  else
    @exist = true
    parts = response.split
    @num_dirs, @num_files, @size = parts[0..2].map(&:to_i)
    @hdfs_path = parts.last
  end
  self
end

#rm(options = {}) ⇒ Object Also known as: rm!

Delete this resource from the HDFS.

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :skip_trash (true, false)


45
46
47
48
49
50
51
52
# File 'lib/imw/resources/schemes/hdfs.rb', line 45

def rm options={}
  should_exist!("Cannot delete.")
  args = [:rm]
  args << '-skipTrash' if options[:skip] || options[:skip_trash] || options[:skipTrash]
  args << path
  HDFS.fs(*args)
  self
end

#sizeFixnum

Return the size (in bytes) of this resource on the HDFS.

This value is cached. Call refresh to refresh the cache manually.

Returns:

  • (Fixnum)


73
74
75
76
77
78
# File 'lib/imw/resources/schemes/hdfs.rb', line 73

def size
  return @size unless @size.nil?
  refresh!
  should_exist!("Cannot report size")
  @size
end