Module: IMW::Schemes::HDFS

Defined in:
lib/imw/schemes/hdfs.rb

Overview

Defines methods for reading and writing data to/from an HDFS]

Learn more about Hadoop and the Hadoop Distributed Filesystem.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(obj) ⇒ Object

Checks to see if this is a file or directory



13
14
15
# File 'lib/imw/schemes/hdfs.rb', line 13

def self.extended obj
  obj.extend(obj.is_directory? ? HDFSDirectory : HDFSFile)
end

.fs(command, *args) {|String| ... } ⇒ String

Execute command with args on the Hadoop Distributed Filesystem (HDFS).

If passed a block, yield each line of the output from the command, else just return the output.

Try running ‘hadoop fs -help’ for more information.

Yields:

  • (String)

    each line of the command’s output



143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/imw/schemes/hdfs.rb', line 143

def self.fs command, *args
  command_string = "#{executable} fs -#{command} #{args.compact.map(&:to_str).join(' ')}"
  command_string += " 2>&1" if command == :count # FIXME or else it just spams the screen when we do HDFS#refresh!
  output = `#{command_string}`.chomp
  if block_given?
    output.split("\n").each do |line|
      yield line
    end
  else
    output
  end
end

Instance Method Details

#cp(new_uri) ⇒ IMW::Resource

Copy this resource to the new_uri.



29
30
31
# File 'lib/imw/schemes/hdfs.rb', line 29

def cp new_uri
  IMW::Tools::Transferer.new(:cp, self, new_uri).transfer!
end

#exist?true, false Also known as: exists?

Does this path exist on the HDFS?



58
59
60
61
62
# File 'lib/imw/schemes/hdfs.rb', line 58

def exist?
  return @exist unless @exist.nil?
  refresh!
  @exist
end

#is_directory?true, false

Is this resource an HDFS directory?



110
111
112
# File 'lib/imw/schemes/hdfs.rb', line 110

def is_directory?
  exist? && num_dirs > 0
end

#mv(new_uri) ⇒ IMW::Resource

Move this resource to the new_uri.



37
38
39
# File 'lib/imw/schemes/hdfs.rb', line 37

def mv new_uri
  IMW::Tools::Transferer.new(:mv, self, new_uri).transfer!
end

#num_dirsFixnum

Return the number of directories contained at or below this path on the HDFS.

This value is cached. Call refresh to refresh the cache manually.



86
87
88
89
90
91
# File 'lib/imw/schemes/hdfs.rb', line 86

def num_dirs
  return @num_dirs unless @num_dirs.nil?
  refresh!
  should_exist!("Cannot report number of directories.")
  @num_dirs
end

#num_filesFixnum

Return the number of files contained at or below this path on the HDFS.

This value is cached. Call refresh to refresh the cache manually.



100
101
102
103
104
105
# File 'lib/imw/schemes/hdfs.rb', line 100

def num_files
  return @num_files unless @num_files.nil?
  refresh!
  should_exist!("Cannot report number of files.")
  @num_files
end

#on_hdfs?true, false Also known as: is_hdfs?

Is this resource an HDFS resource?



20
21
22
# File 'lib/imw/schemes/hdfs.rb', line 20

def on_hdfs?
  true
end

#refresh!IMW::Resource

Refresh the cached file properties.



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/imw/schemes/hdfs.rb', line 117

def refresh!
  response = HDFS.fs(:count, path)
  if response.blank? || response =~ /^Can not find listing for/
    @exist = false
    @num_dirs, @num_files, @size, @hdfs_path = false, false, false, false
  else
    @exist = true
    parts = response.split
    @num_dirs, @num_files, @size = parts[0..2].map(&:to_i)
    @hdfs_path = parts.last
  end
  self
end

#rm(options = {}) ⇒ Object Also known as: rm!

Delete this resource from the HDFS.

Options Hash (options):

  • :skip_trash (true, false)


44
45
46
47
48
49
50
51
# File 'lib/imw/schemes/hdfs.rb', line 44

def rm options={}
  should_exist!("Cannot delete.")
  args = [:rm]
  args << '-skipTrash' if options[:skip] || options[:skip_trash] || options[:skipTrash]
  args << path
  HDFS.fs(*args)
  self
end

#sizeFixnum

Return the size (in bytes) of this resource on the HDFS.

This value is cached. Call refresh to refresh the cache manually.



72
73
74
75
76
77
# File 'lib/imw/schemes/hdfs.rb', line 72

def size
  return @size unless @size.nil?
  refresh!
  should_exist!("Cannot report size")
  @size
end