Class: Cosmos::TcpipSocketStream

Inherits:
Stream show all
Defined in:
lib/cosmos/streams/tcpip_socket_stream.rb

Overview

Data Stream which reads and writes from Tcpip Sockets.

Direct Known Subclasses

TcpipClientStream

Instance Attribute Summary collapse

Attributes inherited from Stream

#raw_logger_pair

Instance Method Summary collapse

Constructor Details

#initialize(write_socket, read_socket, write_timeout, read_timeout) ⇒ TcpipSocketStream

Returns a new instance of TcpipSocketStream.

Parameters:

  • write_socket (Socket)

    Socket to write

  • read_socket (Socket)

    Socket to read

  • write_timeout (Float|nil)

    Number of seconds to wait for the write to complete or nil to block until the socket is ready to write.

  • read_timeout (Float|nil)

    Number of seconds to wait for the read to complete or nil to block until the socket is ready to read.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 29

def initialize(write_socket, read_socket, write_timeout, read_timeout)
  super()

  @write_socket  = write_socket
  @read_socket   = read_socket
  @write_timeout = ConfigParser.handle_nil(write_timeout)
  @write_timeout = @write_timeout.to_f if @write_timeout
  @read_timeout  = ConfigParser.handle_nil(read_timeout)
  @read_timeout  = @read_timeout.to_f if @read_timeout
  @connected = true

  # Mutex on write is needed to protect from commands coming in from more
  # than one tool
  @write_mutex = Mutex.new
end

Instance Attribute Details

#write_socketObject (readonly)

Returns the value of attribute write_socket.



21
22
23
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 21

def write_socket
  @write_socket
end

Instance Method Details

#connected?Boolean

Returns Whether the sockets are connected.

Returns:

  • (Boolean)

    Whether the sockets are connected



124
125
126
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 124

def connected?
  @connected
end

#disconnectObject

Disconnect by closing the sockets



129
130
131
132
133
134
135
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 129

def disconnect
  if @connected
    @write_socket.close if @write_socket and !@write_socket.closed?
    @read_socket.close if @read_socket and !@read_socket.closed?
    @connected = false
  end
end

#readString

Returns a binary string of data from the socket

Returns:

  • (String)

    Returns a binary string of data from the socket



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 46

def read
  raise "Attempt to read from write only stream" unless @read_socket

  # No read mutex is needed because there is only one stream procesor
  # reading
  begin
    data = @read_socket.recv_nonblock(65535)
    @raw_logger_pair.read_logger.write(data) if @raw_logger_pair
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK
    # Wait for the socket to be ready for reading or for the timeout
    begin
      result = IO.fast_select([@read_socket], nil, nil, @read_timeout)

      # If select returns something it means the socket is now available for
      # reading so retry the read. If it returns nil it means we timed out.
      if result
        retry
      else
        raise Timeout::Error, "Read Timeout"
      end
    rescue IOError, Errno::ENOTSOCK
      # These can happen with the socket being closed while waiting on select
      data = ''
    end
  rescue Errno::ECONNRESET, Errno::ECONNABORTED
    data = ''
  end

  data
end

#read_nonblockString

Returns a binary string of data from the socket. Always returns immediately

Returns:

  • (String)

    Returns a binary string of data from the socket. Always returns immediately



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 78

def read_nonblock
  # No read mutex is needed because there is only one stream procesor
  # reading
  begin
    data = @read_socket.recv_nonblock(65535)
    @raw_logger_pair.read_logger.write(data) if @raw_logger_pair
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNRESET, Errno::ECONNABORTED
    data = ''
  end

  data
end

#write(data) ⇒ Object

Parameters:

  • data (String)

    A binary string of data to write to the socket



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 92

def write(data)
  raise "Attempt to write to read only stream" unless @write_socket

  @write_mutex.synchronize do
    num_bytes_to_send = data.length
    total_bytes_sent = 0
    bytes_sent = 0
    data_to_send = data

    loop do
      begin
        bytes_sent = @write_socket.write_nonblock(data_to_send)
        @raw_logger_pair.write_logger.write(data_to_send[0..(bytes_sent - 1)]) if @raw_logger_pair and bytes_sent > 0
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK
        # Wait for the socket to be ready for writing or for the timeout
        result = IO.fast_select(nil, [@write_socket], nil, @write_timeout)
        # If select returns something it means the socket is now available for
        # writing so retry the write. If it returns nil it means we timed out.
        if result
          retry
        else
          raise Timeout::Error, "Write Timeout"
        end
      end
      total_bytes_sent += bytes_sent
      break if total_bytes_sent >= num_bytes_to_send
      data_to_send = data[total_bytes_sent..-1]
    end
  end
end