Module: IMW::Resources::Schemes::HDFS
- Defined in:
- lib/imw/resources/schemes/hdfs.rb
Overview
Defines methods for reading and writing data to/from an HDFS]
Learn more about Hadoop and the Hadoop Distributed Filesystem.
Class Method Summary collapse
-
.extended(obj) ⇒ Object
Checks to see if this is a file or directory.
-
.fs(command, *args) {|String| ... } ⇒ String
Execute
command
withargs
on the Hadoop Distributed Filesystem (HDFS).
Instance Method Summary collapse
-
#cp(new_uri) ⇒ IMW::Resource
Copy this resource to the
new_uri
. -
#exist? ⇒ true, false
(also: #exists?)
Does this path exist on the HDFS?.
-
#is_directory? ⇒ true, false
Is this resource an HDFS directory?.
-
#mv(new_uri) ⇒ IMW::Resource
Move this resource to the
new_uri
. -
#num_dirs ⇒ Fixnum
Return the number of directories contained at or below this path on the HDFS.
-
#num_files ⇒ Fixnum
Return the number of files contained at or below this path on the HDFS.
-
#on_hdfs? ⇒ true, false
(also: #is_hdfs?)
Is this resource an HDFS resource?.
-
#refresh! ⇒ IMW::Resource
Refresh the cached file properties.
-
#rm(options = {}) ⇒ Object
(also: #rm!)
Delete this resource from the HDFS.
-
#size ⇒ Fixnum
Return the size (in bytes) of this resource on the HDFS.
Class Method Details
.extended(obj) ⇒ Object
Checks to see if this is a file or directory
14 15 16 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 14 def self.extended obj obj.extend(obj.is_directory? ? HDFSDirectory : HDFSFile) end |
.fs(command, *args) {|String| ... } ⇒ String
Execute command
with args
on the Hadoop Distributed Filesystem (HDFS).
If passed a block, yield each line of the output from the command, else just return the output.
Try running ‘hadoop fs -help’ for more information.
144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 144 def self.fs command, *args command_string = "#{executable} fs -#{command} #{args.compact.map(&:to_str).join(' ')}" command_string += " 2>&1" if command == :count # FIXME or else it just spams the screen when we do HDFS#refresh! output = `#{command_string}`.chomp if block_given? output.split("\n").each do |line| yield line end else output end end |
Instance Method Details
#cp(new_uri) ⇒ IMW::Resource
Copy this resource to the new_uri
.
30 31 32 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 30 def cp new_uri IMW::Transforms::Transferer.new(:cp, self, new_uri).transfer! end |
#exist? ⇒ true, false Also known as: exists?
Does this path exist on the HDFS?
59 60 61 62 63 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 59 def exist? return @exist unless @exist.nil? refresh! @exist end |
#is_directory? ⇒ true, false
Is this resource an HDFS directory?
111 112 113 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 111 def is_directory? exist? && num_dirs > 0 end |
#mv(new_uri) ⇒ IMW::Resource
Move this resource to the new_uri
.
38 39 40 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 38 def mv new_uri IMW::Transforms::Transferer.new(:mv, self, new_uri).transfer! end |
#num_dirs ⇒ Fixnum
Return the number of directories contained at or below this path on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
87 88 89 90 91 92 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 87 def num_dirs return @num_dirs unless @num_dirs.nil? refresh! should_exist!("Cannot report number of directories.") @num_dirs end |
#num_files ⇒ Fixnum
Return the number of files contained at or below this path on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
101 102 103 104 105 106 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 101 def num_files return @num_files unless @num_files.nil? refresh! should_exist!("Cannot report number of files.") @num_files end |
#on_hdfs? ⇒ true, false Also known as: is_hdfs?
Is this resource an HDFS resource?
21 22 23 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 21 def on_hdfs? true end |
#refresh! ⇒ IMW::Resource
Refresh the cached file properties.
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 118 def refresh! response = HDFS.fs(:count, path) if response.blank? || response =~ /^Can not find listing for/ @exist = false @num_dirs, @num_files, @size, @hdfs_path = false, false, false, false else @exist = true parts = response.split @num_dirs, @num_files, @size = parts[0..2].map(&:to_i) @hdfs_path = parts.last end self end |
#rm(options = {}) ⇒ Object Also known as: rm!
Delete this resource from the HDFS.
45 46 47 48 49 50 51 52 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 45 def rm ={} should_exist!("Cannot delete.") args = [:rm] args << '-skipTrash' if [:skip] || [:skip_trash] || [:skipTrash] args << path HDFS.fs(*args) self end |
#size ⇒ Fixnum
Return the size (in bytes) of this resource on the HDFS.
This value is cached. Call refresh
to refresh the cache manually.
73 74 75 76 77 78 |
# File 'lib/imw/resources/schemes/hdfs.rb', line 73 def size return @size unless @size.nil? refresh! should_exist!("Cannot report size") @size end |