Class: Rex::IO::RingBuffer::UnitTest
- Inherits:
-
Test::Unit::TestCase
- Object
- Test::Unit::TestCase
- Rex::IO::RingBuffer::UnitTest
- Defined in:
- lib/rex/io/ring_buffer.rb.ut.rb
Overview
TODO: Mock up the socket so this test doesn’t take so long
Instance Method Summary collapse
- #setup ⇒ Object
- #teardown ⇒ Object
- #test_sequential_read_data ⇒ Object
- #test_single_read_data ⇒ Object
- #test_wrap ⇒ Object
Instance Method Details
#setup ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rex/io/ring_buffer.rb.ut.rb', line 14 def setup server = Rex::Socket.create_tcp_server('LocalPort' => 0) lport = server.getsockname[2] @client = Rex::Socket.create_tcp('PeerHost' => '127.0.0.1', 'PeerPort' => lport) conn = server.accept #server.close @r = Rex::IO::RingBuffer.new(conn, {:size => 64}) @r.start_monitor end |
#teardown ⇒ Object
25 26 27 28 29 30 31 |
# File 'lib/rex/io/ring_buffer.rb.ut.rb', line 25 def teardown begin @client.close @r.stop_monitor rescue ::Exception end end |
#test_sequential_read_data ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rex/io/ring_buffer.rb.ut.rb', line 41 def test_sequential_read_data @r.clear_data s = nil 0.upto(10) do |num| @client.put(num.to_s) @r.wait(s) s,d = @r.read_data(s) assert_equal(num.to_s, d) end end |
#test_single_read_data ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/rex/io/ring_buffer.rb.ut.rb', line 33 def test_single_read_data @client.put("123") @r.wait(0) s,d = @r.read_data assert_equal("123", d) end |
#test_wrap ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/rex/io/ring_buffer.rb.ut.rb', line 53 def test_wrap @r.clear_data 0.upto(@r.size - 1) { @client.put("a") # Need to sleep so the socket doesn't get all the data in one read() sleep 0.05 } s,d = @r.read_data @client.put("b") sleep 0.01 s,d = @r.read_data(s) assert_equal("b", d) end |