Module: RxIO::IOBase

Included in:
Client, Service
Defined in:
lib/rxio/io_base.rb

Overview

I/O Base Module

Constant Summary collapse

CHUNK_SIZE =

Chunk Size

1024

Instance Method Summary collapse

Instance Method Details

#drop_endpoint(endpoint) ⇒ Object

Drop Endpoint Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint

Parameters:

  • endpoint (Hash)


70
71
72
73
74
75
76
77
# File 'lib/rxio/io_base.rb', line 70

def drop_endpoint endpoint

  # Notify Service Handler
  @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop

  # Drop Endpoint
  on_drop endpoint
end

#process_input(endpoint, chunk) ⇒ Object

Process Input Processes Input for an Endpoint, in the form of a data chunk

Parameters:

  • endpoint (Hash)
  • chunk (String)

    A chunk of data, as received by the socket



17
18
19
20
21
22
23
24
# File 'lib/rxio/io_base.rb', line 17

def process_input endpoint, chunk

  # Pass through Service Handler Module
  @service_handler.filter_input endpoint, chunk

  # Process Messages
  @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty?
end

#read_sock(s) ⇒ Object

Read Socket Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.

Parameters:

  • s (TCPSocket)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rxio/io_base.rb', line 29

def read_sock s

  # Acquire Endpoint for Socket
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # Read Chunk from Socket
  chunk = s.read_nonblock CHUNK_SIZE rescue nil

  # Drop Endpoint & Abort on Error
  return drop_endpoint e unless chunk

  # Process Input
  process_input e, chunk
end

#write_sock(s) ⇒ Object

Write Socket Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.

Parameters:

  • s (TCPSocket)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rxio/io_base.rb', line 50

def write_sock s

  # Acquire Endpoint
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # Synchronize Endpoint
  e[:lock].synchronize do

    # Write as much as possible
    size = (s.write_nonblock e[:obuf] rescue nil) || 0
    e[:obuf].slice! 0, size if size > 0
  end
end