Module: RxIO::IOBase
Overview
I/O Base Module
Constant Summary collapse
- CHUNK_SIZE =
Chunk Size
1024
Instance Method Summary collapse
-
#drop_endpoint(endpoint) ⇒ Object
Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.
-
#peer_error(p, _e) ⇒ Object
Peer Error: Handles an Error from a Peer.
-
#process_input(endpoint, chunk) ⇒ Object
Process Input: Processes Input for an Endpoint, in the form of a data chunk.
-
#read_sock(s) ⇒ Object
Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.
-
#write_sock(s) ⇒ Object
Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.
Instance Method Details
#drop_endpoint(endpoint) ⇒ Object
Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.
92 93 94 95 96 97 98 99 |
# File 'lib/rxio/io_base.rb', line 92 def drop_endpoint endpoint # Notify Service Handler @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop # Drop Endpoint on_drop endpoint end |
#peer_error(p, _e) ⇒ Object
Peer Error: Handles an Error from a Peer.
105 106 107 |
# File 'lib/rxio/io_base.rb', line 105 def peer_error p, _e drop_endpoint p end |
#process_input(endpoint, chunk) ⇒ Object
Process Input: Processes Input for an Endpoint, in the form of a data chunk.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/rxio/io_base.rb', line 17 def process_input endpoint, chunk # Begin begin # Pass through Service Handler Module @service_handler.filter_input endpoint, chunk # Process Messages @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty? # Sub-Process Input @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input # Rescue rescue Exception => e # Peer Error peer_error endpoint, e end end |
#read_sock(s) ⇒ Object
Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rxio/io_base.rb', line 42 def read_sock s # Acquire Endpoint for Socket e = get_endpoint_for_sock s # Check Endpoint return unless e # OpenSSL doesn't play nice with 'select' because of intermediate buffering. # Therefore we loop until we either have an explicit error or a signal to wait. done = false chunk = '' until done || !chunk begin chunk << s.read_nonblock(CHUNK_SIZE) rescue Exception => ex if ex.is_a? OpenSSL::SSL::SSLErrorWaitReadable; then done = true; else chunk = nil; end end end # Drop Endpoint & Abort on Error return drop_endpoint e unless chunk # Process Input process_input e, chunk end |
#write_sock(s) ⇒ Object
Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/rxio/io_base.rb', line 72 def write_sock s # Acquire Endpoint e = get_endpoint_for_sock s # Check Endpoint return unless e # Synchronize Endpoint e[:lock].synchronize do # Write as much as possible size = (s.write_nonblock e[:obuf] rescue nil) || 0 e[:obuf].slice! 0, size if size > 0 end end |