Module: Rex::IO::Stream

Defined in:
lib/rex/io/stream.rb

Overview

This mixin is an abstract representation of a streaming connection. Streams extend classes that must implement the following methods:

syswrite(buffer)
sysread(length)
shutdown(how)
close
peerinfo
localinfo

Instance Method Summary collapse

Instance Method Details

#<<(buf) ⇒ Object

This method writes the supplied buffer to the stream by calling the write routine.



148
149
150
# File 'lib/rex/io/stream.rb', line 148

def <<(buf)
  return write(buf.to_s)
end

#>>Object

This method calls get_once() to read pending data from the socket



155
156
157
# File 'lib/rex/io/stream.rb', line 155

def >>
  get_once
end

#closeObject



37
38
39
40
41
42
# File 'lib/rex/io/stream.rb', line 37

def close
  self.close_resource = true
  synchronize_update {
    super
  }
end

#def_block_sizeObject

The default block size to read in chunks from the wire.



324
325
326
# File 'lib/rex/io/stream.rb', line 324

def def_block_size
  16384
end

#def_max_loopsObject

The maximum number of read loops to perform before returning to the caller.



317
318
319
# File 'lib/rex/io/stream.rb', line 317

def def_max_loops
  1024
end

#def_read_loop_timeoutObject

The default number of seconds to wait while in a read loop after read data has been found.



309
310
311
# File 'lib/rex/io/stream.rb', line 309

def def_read_loop_timeout
  0.1
end

#def_read_timeoutObject

The default number of seconds to wait for a read operation to timeout.



301
302
303
# File 'lib/rex/io/stream.rb', line 301

def def_read_timeout
  10
end

#def_write_timeoutObject

The default number of seconds to wait for a write operation to timeout.



294
295
296
# File 'lib/rex/io/stream.rb', line 294

def def_write_timeout
  10
end

#fdObject

This method returns the selectable file descriptor, or self by default.



134
135
136
# File 'lib/rex/io/stream.rb', line 134

def fd
  self
end

#get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {}) ⇒ Object

This method reads as much data as it can from the wire given a maximum timeout.



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/rex/io/stream.rb', line 232

def get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {})
  # For those people who are used to being able to use a negative timeout!
  if (timeout and timeout.to_i < 0)
    timeout = nil
  end

  # No data in the first place? bust.
  if (has_read_data?(timeout) == false)
    return nil
  end

  buf = ""
  lps = 0
  eof = false

  # Keep looping until there is no more data to be gotten..
  while (has_read_data?(ltimeout) == true)
    # Catch EOF errors so that we can handle them properly.
    begin
      temp = read(def_block_size)
    rescue EOFError
      eof = true
    end

    # If we read zero bytes and we had data, then we've hit EOF
    if (temp and temp.length == 0)
      eof = true
    end

    # If we reached EOF and there are no bytes in the buffer we've been
    # reading into, then throw an EOF error.
    if (eof)
      # If we've already read at least some data, then it's time to
      # break out and let it be processed before throwing an EOFError.
      if (buf.length > 0)
        break
      else
        raise EOFError
      end
    end

    break if (temp == nil or temp.empty? == true)

    buf += temp
    lps += 1

    break if (lps >= def_max_loops)
  end

  # Return the entire buffer we read in
  return buf
end

#get_once(length = -1,, timeout = def_read_timeout) ⇒ Object

This method emulates the behavior of Pex::Socket::Recv in MSF2

Raises:

  • (EOFError)


216
217
218
219
220
221
222
223
224
225
226
# File 'lib/rex/io/stream.rb', line 216

def get_once(length = -1, timeout = def_read_timeout)

  if (has_read_data?(timeout) == false)
    return nil
  end

  bsize = (length == -1) ? def_block_size : length
  data  = read(bsize)
  raise EOFError if data.nil?
  data
end

#has_read_data?(timeout = nil) ⇒ Boolean

Polls the stream to see if there is any read data available. Returns true if data is available for reading, otherwise false is returned.

Returns:

  • (Boolean)


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rex/io/stream.rb', line 108

def has_read_data?(timeout = nil)

  # Allow a timeout of "0" that waits almost indefinitely for input, this
  # mimics the behavior of Rex::ThreadSafe.select() and fixes some corner
  # cases of unintentional no-wait timeouts.
  timeout = 3600 if (timeout and timeout == 0)

  begin
    if ((rv = ::IO.select([ fd ], nil, nil, timeout)) and
        (rv[0]) and
        (rv[0][0] == fd))
      true
    else
      false
    end
  rescue ::Errno::EBADF, ::Errno::ENOTSOCK
    raise ::EOFError
  rescue StreamClosedError, ::IOError, ::EOFError, ::Errno::EPIPE
    #  Return false if the socket is dead
    return false
  end
end

#initialize_synchronizationObject

Initialize synchronization for this stream. This should be used if the stream will be written to, read or closed from multiple threads.



32
33
34
35
# File 'lib/rex/io/stream.rb', line 32

def initialize_synchronization
  self.stream_lock = Rex::ReadWriteLock.new
  self.close_resource = false
end

#put(buf, opts = {}) ⇒ Object

This method writes the full contents of the supplied buffer, optionally with a timeout.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/rex/io/stream.rb', line 191

def put(buf, opts = {})
  return 0 if (buf == nil or buf.length == 0)

  send_len = buf.length
  send_idx = 0
  wait     = opts['Timeout'] || 0

  # Keep writing until our send length drops to zero
  while (send_idx < send_len)
    curr_len  = timed_write(buf[send_idx, buf.length-send_idx], wait, opts)

    # If the write operation failed due to an IOError, then we fail.
    return buf.length - send_len if (curr_len == nil)

    send_len -= curr_len
    send_idx += curr_len
  end

  return buf.length - send_len
end

#read(length = nil, opts = {}) ⇒ Object

This method reads data of the supplied length from the stream.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rex/io/stream.rb', line 88

def read(length = nil, opts = {})
  synchronize_access {
    begin
      return fd.read_nonblock( length )
    rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK
      return nil if self.close_resource
      # Sleep for a half a second, or until we can read again
      Rex::ThreadSafe.select( [ fd ], nil, nil, 0.5 )
      # Decrement the block size to handle full sendQs better
      retry
    rescue ::IOError, ::Errno::EPIPE
      return nil
    end
  }
end

#synchronize_accessObject

Synchronize non-state changing access to the stream such as read and write operations. If synchronization has not been initialized, this doesn’t do anything.



333
334
335
336
337
338
339
340
# File 'lib/rex/io/stream.rb', line 333

def synchronize_access
  self.stream_lock.lock_read unless self.stream_lock.nil?
  begin
    yield
  ensure
    self.stream_lock.unlock_read unless self.stream_lock.nil?
  end
end

#synchronize_updateObject

Synchronize state changing operations to the stream such as closing it. If synchronization has not been initialized, this doesn’t do anything.



346
347
348
349
350
351
352
353
# File 'lib/rex/io/stream.rb', line 346

def synchronize_update
  self.stream_lock.lock_write unless self.stream_lock.nil?
  begin
    yield
  ensure
    self.stream_lock.unlock_write unless self.stream_lock.nil?
  end
end

#timed_read(length = nil, wait = def_read_timeout, opts = {}) ⇒ Object

This method reads from the stream, optionally timing out after a period of time.



177
178
179
180
181
182
183
184
185
# File 'lib/rex/io/stream.rb', line 177

def timed_read(length = nil, wait = def_read_timeout, opts = {})
  if (wait and wait > 0)
    Timeout.timeout(wait) {
      return read(length, opts)
    }
  else
    return read(length, opts)
  end
end

#timed_write(buf, wait = def_write_timeout, opts = {}) ⇒ Object

This method writes to the stream, optionally timing out after a period of time.



163
164
165
166
167
168
169
170
171
# File 'lib/rex/io/stream.rb', line 163

def timed_write(buf, wait = def_write_timeout, opts = {})
  if (wait and wait > 0)
    Timeout.timeout(wait) {
      return write(buf, opts)
    }
  else
    return write(buf, opts)
  end
end

#write(buf, opts = {}) ⇒ Object

This method writes the supplied buffer to the stream. This method intelligent reduces the size of supplied buffers so that ruby doesn’t get into a potential global thread blocking state when used on blocking sockets. That is, this method will send the supplied buffer in chunks of, at most, 32768 bytes.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/rex/io/stream.rb', line 51

def write(buf, opts = {})
  total_sent   = 0
  total_length = buf.length
  block_size   = 32768

  synchronize_access {
    begin
      while( total_sent < total_length )
        s = Rex::ThreadSafe.select( nil, [ fd ], nil, 0.2 )
        if( s == nil || s[0] == nil )
          next
        end
        data = buf[total_sent, block_size]
        sent = fd.write_nonblock( data )
        if sent > 0
          total_sent += sent
        end
      end
    rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK
      return nil if self.close_resource
      # Sleep for a half a second, or until we can write again
      Rex::ThreadSafe.select( nil, [ fd ], nil, 0.5 )
      # Decrement the block size to handle full sendQs better
      block_size = 1024
      # Try to write the data again
      retry
    rescue ::IOError, ::Errno::EPIPE
      return nil
    end
  }

  total_sent
end