Module: RxIO::IOBase

Included in:
Client, Service
Defined in:
lib/rxio/io_base.rb

Overview

I/O Base Module

Constant Summary collapse

CHUNK_SIZE =

Chunk Size

1024
RETRY_EXCEPTIONS =

Retry Exceptions

[
  OpenSSL::SSL::SSLErrorWaitReadable,
  IO::EAGAINWaitReadable
]

Instance Method Summary collapse

Instance Method Details

#drop_endpoint(endpoint) ⇒ Object

Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.

Parameters:



103
104
105
106
107
108
109
110
# File 'lib/rxio/io_base.rb', line 103

def drop_endpoint endpoint

  # Notify Service Handler
  @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop

  # Drop Endpoint
  on_drop endpoint
end

#peer_error(p, _e) ⇒ Object

Peer Error: Handles an Error from a Peer.

Parameters:

  • Endpoint Hash

  • Exception



116
117
118
# File 'lib/rxio/io_base.rb', line 116

def peer_error p, _e
  drop_endpoint p
end

#process_input(endpoint, chunk) ⇒ Object

Process Input: Processes Input for an Endpoint, in the form of a data chunk.

Parameters:

  • A chunk of data, as received by the socket



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rxio/io_base.rb', line 26

def process_input endpoint, chunk

  # Begin
  begin

    # Pass through Service Handler Module
    @service_handler.filter_input endpoint, chunk

    # Process Messages
    @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty?

    # Sub-Process Input
    @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input

    # Rescue
  rescue Exception => e

    # Peer Error
    peer_error endpoint, e
  end
end

#read_sock(s) ⇒ Object

Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.

Parameters:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rxio/io_base.rb', line 51

def read_sock s

  # Acquire Endpoint for Socket
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # OpenSSL doesn't play nice with 'select' because of intermediate buffering.
  # Therefore we loop until we either have an explicit error or a signal to wait.
  done = false
  chunk = ''
  rdstr = ''
  until done || !rdstr
    begin
      rdstr = s.read_nonblock(CHUNK_SIZE)
      chunk << rdstr if rdstr
    rescue Exception => ex
      if RETRY_EXCEPTIONS.include? ex.class; then done = true; else rdstr = nil; end
    end
  end

  # Drop Endpoint & Abort on Error
  return drop_endpoint e unless rdstr

  # Process Input
  process_input e, chunk
end

#write_sock(s) ⇒ Object

Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.

Parameters:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rxio/io_base.rb', line 83

def write_sock s

  # Acquire Endpoint
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # Synchronize Endpoint
  e[:lock].synchronize do

    # Write as much as possible
    size = (s.write_nonblock e[:obuf] rescue nil) || 0
    e[:obuf].slice! 0, size if size > 0
  end
end