Module: RxIO::IOBase
Overview
I/O Base Module
Constant Summary collapse
- CHUNK_SIZE =
Chunk Size
1024
- RETRY_EXCEPTIONS =
Retry Exceptions
[ OpenSSL::SSL::SSLErrorWaitReadable, IO::EAGAINWaitReadable ]
Instance Method Summary collapse
-
#drop_endpoint(endpoint) ⇒ Object
Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.
-
#peer_error(p, _e) ⇒ Object
Peer Error: Handles an Error from a Peer.
-
#process_input(endpoint, chunk) ⇒ Object
Process Input: Processes Input for an Endpoint, in the form of a data chunk.
-
#read_sock(s) ⇒ Object
Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.
-
#write_sock(s) ⇒ Object
Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.
Instance Method Details
#drop_endpoint(endpoint) ⇒ Object
Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.
103 104 105 106 107 108 109 110 |
# File 'lib/rxio/io_base.rb', line 103 def drop_endpoint endpoint # Notify Service Handler @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop # Drop Endpoint on_drop endpoint end |
#peer_error(p, _e) ⇒ Object
Peer Error: Handles an Error from a Peer.
116 117 118 |
# File 'lib/rxio/io_base.rb', line 116 def peer_error p, _e drop_endpoint p end |
#process_input(endpoint, chunk) ⇒ Object
Process Input: Processes Input for an Endpoint, in the form of a data chunk.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/rxio/io_base.rb', line 26 def process_input endpoint, chunk # Begin begin # Pass through Service Handler Module @service_handler.filter_input endpoint, chunk # Process Messages @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty? # Sub-Process Input @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input # Rescue rescue Exception => e # Peer Error peer_error endpoint, e end end |
#read_sock(s) ⇒ Object
Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/rxio/io_base.rb', line 51 def read_sock s # Acquire Endpoint for Socket e = get_endpoint_for_sock s # Check Endpoint return unless e # OpenSSL doesn't play nice with 'select' because of intermediate buffering. # Therefore we loop until we either have an explicit error or a signal to wait. done = false chunk = '' rdstr = '' until done || !rdstr begin rdstr = s.read_nonblock(CHUNK_SIZE) chunk << rdstr if rdstr rescue Exception => ex if RETRY_EXCEPTIONS.include? ex.class; then done = true; else rdstr = nil; end end end # Drop Endpoint & Abort on Error return drop_endpoint e unless rdstr # Process Input process_input e, chunk end |
#write_sock(s) ⇒ Object
Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/rxio/io_base.rb', line 83 def write_sock s # Acquire Endpoint e = get_endpoint_for_sock s # Check Endpoint return unless e # Synchronize Endpoint e[:lock].synchronize do # Write as much as possible size = (s.write_nonblock e[:obuf] rescue nil) || 0 e[:obuf].slice!(0, size) if size > 0 end end |