Module: Rex::IO::Stream
- Defined in:
- lib/rex/io/stream.rb
Overview
This mixin is an abstract representation of a streaming connection. Streams extend classes that must implement the following methods:
syswrite(buffer)
sysread(length)
shutdown(how)
close
peerinfo
localinfo
Instance Method Summary collapse
-
#<<(buf) ⇒ Object
This method writes the supplied buffer to the stream by calling the write routine.
-
#>> ⇒ Object
This method calls get_once() to read pending data from the socket.
- #close ⇒ Object
-
#def_block_size ⇒ Object
The default block size to read in chunks from the wire.
-
#def_max_loops ⇒ Object
The maximum number of read loops to perform before returning to the caller.
-
#def_read_loop_timeout ⇒ Object
The default number of seconds to wait while in a read loop after read data has been found.
-
#def_read_timeout ⇒ Object
The default number of seconds to wait for a read operation to timeout.
-
#def_write_timeout ⇒ Object
The default number of seconds to wait for a write operation to timeout.
-
#fd ⇒ Object
This method returns the selectable file descriptor, or self by default.
-
#get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {}) ⇒ Object
This method reads as much data as it can from the wire given a maximum timeout.
-
#get_once(length = -1,, timeout = def_read_timeout) ⇒ Object
This method emulates the behavior of Pex::Socket::Recv in MSF2.
-
#has_read_data?(timeout = nil) ⇒ Boolean
Polls the stream to see if there is any read data available.
-
#initialize_synchronization ⇒ Object
Initialize synchronization for this stream.
-
#put(buf, opts = {}) ⇒ Object
This method writes the full contents of the supplied buffer, optionally with a timeout.
-
#read(length = nil, opts = {}) ⇒ Object
This method reads data of the supplied length from the stream.
-
#synchronize_access ⇒ Object
Synchronize non-state changing access to the stream such as read and write operations.
-
#synchronize_update ⇒ Object
Synchronize state changing operations to the stream such as closing it.
-
#timed_read(length = nil, wait = def_read_timeout, opts = {}) ⇒ Object
This method reads from the stream, optionally timing out after a period of time.
-
#timed_write(buf, wait = def_write_timeout, opts = {}) ⇒ Object
This method writes to the stream, optionally timing out after a period of time.
-
#write(buf, opts = {}) ⇒ Object
This method writes the supplied buffer to the stream.
Instance Method Details
#<<(buf) ⇒ Object
This method writes the supplied buffer to the stream by calling the write routine.
148 149 150 |
# File 'lib/rex/io/stream.rb', line 148 def <<(buf) return write(buf.to_s) end |
#>> ⇒ Object
This method calls get_once() to read pending data from the socket
155 156 157 |
# File 'lib/rex/io/stream.rb', line 155 def >> get_once end |
#close ⇒ Object
37 38 39 40 41 42 |
# File 'lib/rex/io/stream.rb', line 37 def close self.close_resource = true synchronize_update { super } end |
#def_block_size ⇒ Object
The default block size to read in chunks from the wire.
324 325 326 |
# File 'lib/rex/io/stream.rb', line 324 def def_block_size 16384 end |
#def_max_loops ⇒ Object
The maximum number of read loops to perform before returning to the caller.
317 318 319 |
# File 'lib/rex/io/stream.rb', line 317 def def_max_loops 1024 end |
#def_read_loop_timeout ⇒ Object
The default number of seconds to wait while in a read loop after read data has been found.
309 310 311 |
# File 'lib/rex/io/stream.rb', line 309 def def_read_loop_timeout 0.1 end |
#def_read_timeout ⇒ Object
The default number of seconds to wait for a read operation to timeout.
301 302 303 |
# File 'lib/rex/io/stream.rb', line 301 def def_read_timeout 10 end |
#def_write_timeout ⇒ Object
The default number of seconds to wait for a write operation to timeout.
294 295 296 |
# File 'lib/rex/io/stream.rb', line 294 def def_write_timeout 10 end |
#fd ⇒ Object
This method returns the selectable file descriptor, or self by default.
134 135 136 |
# File 'lib/rex/io/stream.rb', line 134 def fd self end |
#get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {}) ⇒ Object
This method reads as much data as it can from the wire given a maximum timeout.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/rex/io/stream.rb', line 232 def get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {}) # For those people who are used to being able to use a negative timeout! if (timeout and timeout.to_i < 0) timeout = nil end # No data in the first place? bust. if (has_read_data?(timeout) == false) return nil end buf = "" lps = 0 eof = false # Keep looping until there is no more data to be gotten.. while (has_read_data?(ltimeout) == true) # Catch EOF errors so that we can handle them properly. begin temp = read(def_block_size) rescue EOFError eof = true end # If we read zero bytes and we had data, then we've hit EOF if (temp and temp.length == 0) eof = true end # If we reached EOF and there are no bytes in the buffer we've been # reading into, then throw an EOF error. if (eof) # If we've already read at least some data, then it's time to # break out and let it be processed before throwing an EOFError. if (buf.length > 0) break else raise EOFError end end break if (temp == nil or temp.empty? == true) buf += temp lps += 1 break if (lps >= def_max_loops) end # Return the entire buffer we read in return buf end |
#get_once(length = -1,, timeout = def_read_timeout) ⇒ Object
This method emulates the behavior of Pex::Socket::Recv in MSF2
216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/rex/io/stream.rb', line 216 def get_once(length = -1, timeout = def_read_timeout) if (has_read_data?(timeout) == false) return nil end bsize = (length == -1) ? def_block_size : length data = read(bsize) raise EOFError if data.nil? data end |
#has_read_data?(timeout = nil) ⇒ Boolean
Polls the stream to see if there is any read data available. Returns true if data is available for reading, otherwise false is returned.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rex/io/stream.rb', line 108 def has_read_data?(timeout = nil) # Allow a timeout of "0" that waits almost indefinitely for input, this # mimics the behavior of Rex::ThreadSafe.select() and fixes some corner # cases of unintentional no-wait timeouts. timeout = 3600 if (timeout and timeout == 0) begin if ((rv = ::IO.select([ fd ], nil, nil, timeout)) and (rv[0]) and (rv[0][0] == fd)) true else false end rescue ::Errno::EBADF, ::Errno::ENOTSOCK raise ::EOFError rescue StreamClosedError, ::IOError, ::EOFError, ::Errno::EPIPE # Return false if the socket is dead return false end end |
#initialize_synchronization ⇒ Object
Initialize synchronization for this stream. This should be used if the stream will be written to, read or closed from multiple threads.
32 33 34 35 |
# File 'lib/rex/io/stream.rb', line 32 def initialize_synchronization self.stream_lock = Rex::ReadWriteLock.new self.close_resource = false end |
#put(buf, opts = {}) ⇒ Object
This method writes the full contents of the supplied buffer, optionally with a timeout.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/rex/io/stream.rb', line 191 def put(buf, opts = {}) return 0 if (buf == nil or buf.length == 0) send_len = buf.length send_idx = 0 wait = opts['Timeout'] || 0 # Keep writing until our send length drops to zero while (send_idx < send_len) curr_len = timed_write(buf[send_idx, buf.length-send_idx], wait, opts) # If the write operation failed due to an IOError, then we fail. return buf.length - send_len if (curr_len == nil) send_len -= curr_len send_idx += curr_len end return buf.length - send_len end |
#read(length = nil, opts = {}) ⇒ Object
This method reads data of the supplied length from the stream.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rex/io/stream.rb', line 88 def read(length = nil, opts = {}) synchronize_access { begin return fd.read_nonblock( length ) rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK return nil if self.close_resource # Sleep for a half a second, or until we can read again Rex::ThreadSafe.select( [ fd ], nil, nil, 0.5 ) # Decrement the block size to handle full sendQs better retry rescue ::IOError, ::Errno::EPIPE return nil end } end |
#synchronize_access ⇒ Object
Synchronize non-state changing access to the stream such as read and write operations. If synchronization has not been initialized, this doesn’t do anything.
333 334 335 336 337 338 339 340 |
# File 'lib/rex/io/stream.rb', line 333 def synchronize_access self.stream_lock.lock_read unless self.stream_lock.nil? begin yield ensure self.stream_lock.unlock_read unless self.stream_lock.nil? end end |
#synchronize_update ⇒ Object
Synchronize state changing operations to the stream such as closing it. If synchronization has not been initialized, this doesn’t do anything.
346 347 348 349 350 351 352 353 |
# File 'lib/rex/io/stream.rb', line 346 def synchronize_update self.stream_lock.lock_write unless self.stream_lock.nil? begin yield ensure self.stream_lock.unlock_write unless self.stream_lock.nil? end end |
#timed_read(length = nil, wait = def_read_timeout, opts = {}) ⇒ Object
This method reads from the stream, optionally timing out after a period of time.
177 178 179 180 181 182 183 184 185 |
# File 'lib/rex/io/stream.rb', line 177 def timed_read(length = nil, wait = def_read_timeout, opts = {}) if (wait and wait > 0) Timeout.timeout(wait) { return read(length, opts) } else return read(length, opts) end end |
#timed_write(buf, wait = def_write_timeout, opts = {}) ⇒ Object
This method writes to the stream, optionally timing out after a period of time.
163 164 165 166 167 168 169 170 171 |
# File 'lib/rex/io/stream.rb', line 163 def timed_write(buf, wait = def_write_timeout, opts = {}) if (wait and wait > 0) Timeout.timeout(wait) { return write(buf, opts) } else return write(buf, opts) end end |
#write(buf, opts = {}) ⇒ Object
This method writes the supplied buffer to the stream. This method intelligent reduces the size of supplied buffers so that ruby doesn’t get into a potential global thread blocking state when used on blocking sockets. That is, this method will send the supplied buffer in chunks of, at most, 32768 bytes.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/rex/io/stream.rb', line 51 def write(buf, opts = {}) total_sent = 0 total_length = buf.length block_size = 32768 synchronize_access { begin while( total_sent < total_length ) s = Rex::ThreadSafe.select( nil, [ fd ], nil, 0.2 ) if( s == nil || s[0] == nil ) next end data = buf[total_sent, block_size] sent = fd.write_nonblock( data ) if sent > 0 total_sent += sent end end rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK return nil if self.close_resource # Sleep for a half a second, or until we can write again Rex::ThreadSafe.select( nil, [ fd ], nil, 0.5 ) # Decrement the block size to handle full sendQs better block_size = 1024 # Try to write the data again retry rescue ::IOError, ::Errno::EPIPE return nil end } total_sent end |