Class: Rex::IO::RingBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/rex/io/ring_buffer.rb

Direct Known Subclasses

RingBufferUdp

Defined Under Namespace

Classes: Stream

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, opts = {}) ⇒ RingBuffer

Create a new ring buffer



32
33
34
35
36
37
38
39
40
# File 'lib/rex/io/ring_buffer.rb', line 32

def initialize(socket, opts={})
  self.size  = opts[:size] || (1024 * 4)
  self.fd    = socket
  self.seq   = 0
  self.beg   = 0
  self.cur   = 0
  self.queue = Array.new( self.size )
  self.mutex = Mutex.new
end

Instance Attribute Details

#begObject

The index of the earliest data fragment in the ring



24
25
26
# File 'lib/rex/io/ring_buffer.rb', line 24

def beg
  @beg
end

#curObject

The sequence number of the earliest data fragment in the ring



25
26
27
# File 'lib/rex/io/ring_buffer.rb', line 25

def cur
  @cur
end

#fdObject

The associated socket or IO object for this ring buffer



21
22
23
# File 'lib/rex/io/ring_buffer.rb', line 21

def fd
  @fd
end

#monitorObject

The thread handle of the built-in monitor when used



26
27
28
# File 'lib/rex/io/ring_buffer.rb', line 26

def monitor
  @monitor
end

#monitor_thread_errorObject

:nodoc: #



27
28
29
# File 'lib/rex/io/ring_buffer.rb', line 27

def monitor_thread_error
  @monitor_thread_error
end

#mutexObject

The mutex locking access to the queue



23
24
25
# File 'lib/rex/io/ring_buffer.rb', line 23

def mutex
  @mutex
end

#queueObject

The data queue, essentially an array of two-element arrays, containing a sequence and data buffer



19
20
21
# File 'lib/rex/io/ring_buffer.rb', line 19

def queue
  @queue
end

#seqObject

The next available sequence number



20
21
22
# File 'lib/rex/io/ring_buffer.rb', line 20

def seq
  @seq
end

#sizeObject

The number of available slots in the queue



22
23
24
# File 'lib/rex/io/ring_buffer.rb', line 22

def size
  @size
end

Instance Method Details

#base_sequenceObject

The base_sequence method returns the earliest sequence number in the queue. This is zero until all slots are filled and the ring rotates.



180
181
182
183
184
185
# File 'lib/rex/io/ring_buffer.rb', line 180

def base_sequence
  self.mutex.synchronize do
    return 0 if not self.queue[self.beg]
    return self.queue[self.beg][0]
  end
end

#clear_dataObject

The clear_data method wipes the ring buffer



89
90
91
92
93
94
95
96
# File 'lib/rex/io/ring_buffer.rb', line 89

def clear_data
  self.mutex.synchronize do
    self.seq   = 0
    self.beg   = 0
    self.cur   = 0
    self.queue = Array.new( self.size )
  end
end

#create_streamObject

The create_steam method assigns a IO::Socket compatible object to the ringer buffer



198
199
200
# File 'lib/rex/io/ring_buffer.rb', line 198

def create_stream
  Stream.new(self)
end

#inspectObject



42
43
44
# File 'lib/rex/io/ring_buffer.rb', line 42

def inspect
  "#<Rex::IO::RingBuffer @size=#{size} @fd=#{fd} @seq=#{seq} @beg=#{beg} @cur=#{cur}>"
end

#last_sequenceObject

The last_sequence method returns the “next” sequence number where new data will be available.



191
192
193
# File 'lib/rex/io/ring_buffer.rb', line 191

def last_sequence
  self.seq
end

#monitor_threadObject

The built-in monitor thread (normally unused with Metasploit)



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rex/io/ring_buffer.rb', line 64

def monitor_thread
  Thread.new do
    begin
    while self.fd
      buff = self.fd.get_once(-1, 1.0)
      next if not buff
      store_data(buff)
    end
    rescue ::Exception => e
      self.monitor_thread_error = e
    end
  end
end

#put(data, opts = {}) ⇒ Object

Push data back into the associated stream socket. Logging must occur elsewhere, this function is simply a passthrough.



82
83
84
# File 'lib/rex/io/ring_buffer.rb', line 82

def put(data, opts={})
  self.fd.put(data, opts={})
end

#read_data(ptr = nil) ⇒ Object

The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any). A result of nil/nil indicates that no data is available



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/rex/io/ring_buffer.rb', line 127

def read_data(ptr=nil)
  self.mutex.synchronize do

  # Verify that there is data in the queue
  return [nil,nil] if not self.queue[self.beg]

  # Configure the beginning read pointer (sequence number, not index)
  ptr ||= self.queue[self.beg][0]
  return [nil,nil] if not ptr

  # If the pointer is below our baseline, we lost some data, so jump forward
  if ptr < self.queue[self.beg][0]
    ptr = self.queue[self.beg][0]
  end

  # Calculate how many blocks exist between the current sequence number
  # and the requested pointer, this becomes the number of blocks we will
  # need to read to satisfy the result. Due to the mutex block, we do
  # not need to scan to find the sequence of the starting block or
  # check the sequence of the ending block.
  dis = self.seq - ptr

  # If the requested sequnce number is less than our base pointer, it means
  # that no new data is available and we should return empty.
  return [nil,nil] if dis < 0

  # Calculate the beginning block index and number of blocks to read
  off = ptr - self.queue[self.beg][0]
  set = (self.beg + off) % self.size


  # Build the buffer by reading forward by the number of blocks needed
  # and return the last read sequence number, plus one, as the new read
  # pointer.
  buff = ""
  cnt  = 0
  lst  = ptr
  ptr.upto(self.seq) do |i|
    block = self.queue[ (set + cnt) % self.size ]
    lst,data = block[0],block[1]
    buff += data
    cnt += 1
  end

  return [lst + 1, buff]

  end
end

#selectObject

The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a

real select or notify mechanism


207
208
209
# File 'lib/rex/io/ring_buffer.rb', line 207

def select
  ::IO.select([ self.fd ], nil, [ self.fd ], 0.10)
end

#start_monitorObject

Start the built-in monitor, not called when used in a larger framework



49
50
51
# File 'lib/rex/io/ring_buffer.rb', line 49

def start_monitor
  self.monitor = monitor_thread if not self.monitor
end

#stop_monitorObject

Stop the built-in monitor



56
57
58
59
# File 'lib/rex/io/ring_buffer.rb', line 56

def stop_monitor
  self.monitor.kill if self.monitor
  self.monitor = nil
end

#store_data(data) ⇒ Object

The store_data method is used to insert data into the ring buffer.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rex/io/ring_buffer.rb', line 101

def store_data(data)
  self.mutex.synchronize do
    # self.cur points to the array index of queue containing the last item
    # adding data will result in cur + 1 being used to store said data
    # if cur is larger than size - 1, it will wrap back around. If cur
    # is *smaller* beg, beg is increemnted to cur + 1 (and wrapped if
    # necessary

    loc = 0
    if self.seq > 0
      loc = ( self.cur + 1 ) % self.size

      if loc <= self.beg
        self.beg = (self.beg + 1) % self.size
      end
    end

    self.queue[loc] = [self.seq += 1, data]
    self.cur = loc
  end
end

#wait(seq) ⇒ Object

The wait method blocks until new data is available



214
215
216
217
218
219
220
# File 'lib/rex/io/ring_buffer.rb', line 214

def wait(seq)
  nseq = nil
  while not nseq
    nseq,data = read_data(seq)
    select
  end
end

#wait_for(seq, timeout = 1) ⇒ Object

The wait_for method blocks until new data is available or the timeout is reached



225
226
227
228
229
230
231
232
# File 'lib/rex/io/ring_buffer.rb', line 225

def wait_for(seq,timeout=1)
  begin
    ::Timeout.timeout(timeout) do
      wait(seq)
    end
  rescue ::Timeout::Error
  end
end