Module: IMW::Schemes::HDFS
- Defined in:
- lib/imw/schemes/hdfs.rb
Overview
Defines methods for reading and writing data to/from an HDFS]
Learn more about Hadoop and the Hadoop Distributed Filesystem.
Class Method Summary collapse
-
.extended(obj) ⇒ Object
Checks to see if this is a file or directory.
-
.fs(command, *args) {|String| ... } ⇒ String
Execute
command
withargs
on the Hadoop Distributed Filesystem (HDFS).
Instance Method Summary collapse
-
#cp(new_uri) ⇒ IMW::Resource
Copy this resource to the
new_uri
. -
#exist? ⇒ true, false
(also: #exists?)
Does this path exist on the HDFS?.
-
#is_directory? ⇒ true, false
Is this resource an HDFS directory?.
-
#mv(new_uri) ⇒ IMW::Resource
Move this resource to the
new_uri
. -
#num_dirs ⇒ Fixnum
Return the number of directories contained at or below this path on the HDFS.
-
#num_files ⇒ Fixnum
Return the number of files contained at or below this path on the HDFS.
-
#on_hdfs? ⇒ true, false
(also: #is_hdfs?)
Is this resource an HDFS resource?.
-
#refresh! ⇒ IMW::Resource
Refresh the cached file properties.
-
#rm(options = {}) ⇒ Object
(also: #rm!)
Delete this resource from the HDFS.
-
#size ⇒ Fixnum
Return the size (in bytes) of this resource on the HDFS.
Class Method Details
.extended(obj) ⇒ Object
Checks to see if this is a file or directory
13 14 15 |
# File 'lib/imw/schemes/hdfs.rb', line 13 def self.extended obj obj.extend(obj.is_directory? ? HDFSDirectory : HDFSFile) end |
.fs(command, *args) {|String| ... } ⇒ String
Execute command
with args
on the Hadoop Distributed Filesystem (HDFS).
If passed a block, yield each line of the output from the command, else just return the output.
Try running ‘hadoop fs -help’ for more information.
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/imw/schemes/hdfs.rb', line 143 def self.fs command, *args command_string = "#{executable} fs -#{command} #{args.compact.map(&:to_str).join(' ')}" command_string += " 2>&1" if command == :count # FIXME or else it just spams the screen when we do HDFS#refresh! output = `#{command_string}`.chomp if block_given? output.split("\n").each do |line| yield line end else output end end |
Instance Method Details
#cp(new_uri) ⇒ IMW::Resource
Copy this resource to the new_uri
.
29 30 31 |
# File 'lib/imw/schemes/hdfs.rb', line 29 def cp new_uri IMW::Transforms::Transferer.new(:cp, self, new_uri).transfer! end |
#exist? ⇒ true, false Also known as: exists?
Does this path exist on the HDFS?
58 59 60 61 62 |
# File 'lib/imw/schemes/hdfs.rb', line 58 def exist? return @exist unless @exist.nil? refresh! @exist end |
#is_directory? ⇒ true, false
Is this resource an HDFS directory?
110 111 112 |
# File 'lib/imw/schemes/hdfs.rb', line 110 def is_directory? exist? && num_dirs > 0 end |
#mv(new_uri) ⇒ IMW::Resource
Move this resource to the new_uri
.
37 38 39 |
# File 'lib/imw/schemes/hdfs.rb', line 37 def mv new_uri IMW::Transforms::Transferer.new(:mv, self, new_uri).transfer! end |
#num_dirs ⇒ Fixnum
Return the number of directories contained at or below this path on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
86 87 88 89 90 91 |
# File 'lib/imw/schemes/hdfs.rb', line 86 def num_dirs return @num_dirs unless @num_dirs.nil? refresh! should_exist!("Cannot report number of directories.") @num_dirs end |
#num_files ⇒ Fixnum
Return the number of files contained at or below this path on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
100 101 102 103 104 105 |
# File 'lib/imw/schemes/hdfs.rb', line 100 def num_files return @num_files unless @num_files.nil? refresh! should_exist!("Cannot report number of files.") @num_files end |
#on_hdfs? ⇒ true, false Also known as: is_hdfs?
Is this resource an HDFS resource?
20 21 22 |
# File 'lib/imw/schemes/hdfs.rb', line 20 def on_hdfs? true end |
#refresh! ⇒ IMW::Resource
Refresh the cached file properties.
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/imw/schemes/hdfs.rb', line 117 def refresh! response = HDFS.fs(:count, path) if response.blank? || response =~ /^Can not find listing for/ @exist = false @num_dirs, @num_files, @size, @hdfs_path = false, false, false, false else @exist = true parts = response.split @num_dirs, @num_files, @size = parts[0..2].map(&:to_i) @hdfs_path = parts.last end self end |
#rm(options = {}) ⇒ Object Also known as: rm!
Delete this resource from the HDFS.
44 45 46 47 48 49 50 51 |
# File 'lib/imw/schemes/hdfs.rb', line 44 def rm ={} should_exist!("Cannot delete.") args = [:rm] args << '-skipTrash' if [:skip] || [:skip_trash] || [:skipTrash] args << path HDFS.fs(*args) self end |
#size ⇒ Fixnum
Return the size (in bytes) of this resource on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
72 73 74 75 76 77 |
# File 'lib/imw/schemes/hdfs.rb', line 72 def size return @size unless @size.nil? refresh! should_exist!("Cannot report size") @size end |