Module: RxIO::IOBase

Included in:
Client, Service
Defined in:
lib/rxio/io_base.rb

Overview

I/O Base Module

Constant Summary collapse

CHUNK_SIZE =

Chunk Size

1024

Instance Method Summary collapse

Instance Method Details

#drop_endpoint(endpoint) ⇒ Object

Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.

Parameters:

  • endpoint (Hash)


83
84
85
86
87
88
89
90
# File 'lib/rxio/io_base.rb', line 83

def drop_endpoint endpoint

  # Notify Service Handler
  @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop

  # Drop Endpoint
  on_drop endpoint
end

#peer_error(p, _e) ⇒ Object

Peer Error: Handles an Error from a Peer.

Parameters:

  • p (Hash)

    Endpoint Hash

  • e (Exception)

    Exception



96
97
98
# File 'lib/rxio/io_base.rb', line 96

def peer_error p, _e
  drop_endpoint p
end

#process_input(endpoint, chunk) ⇒ Object

Process Input: Processes Input for an Endpoint, in the form of a data chunk.

Parameters:

  • endpoint (Hash)
  • chunk (String)

    A chunk of data, as received by the socket



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/rxio/io_base.rb', line 17

def process_input endpoint, chunk

  # Begin
  begin

    # Pass through Service Handler Module
    @service_handler.filter_input endpoint, chunk

    # Process Messages
    @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty?

    # Sub-Process Input
    @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input

    # Rescue
  rescue Exception => e

    # Peer Error
    peer_error endpoint, e
  end
end

#read_sock(s) ⇒ Object

Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.

Parameters:

  • s (TCPSocket)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rxio/io_base.rb', line 42

def read_sock s

  # Acquire Endpoint for Socket
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # Read Chunk from Socket
  chunk = s.read_nonblock CHUNK_SIZE rescue nil

  # Drop Endpoint & Abort on Error
  return drop_endpoint e unless chunk

  # Process Input
  process_input e, chunk
end

#write_sock(s) ⇒ Object

Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.

Parameters:

  • s (TCPSocket)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rxio/io_base.rb', line 63

def write_sock s

  # Acquire Endpoint
  e = get_endpoint_for_sock s

  # Check Endpoint
  return unless e

  # Synchronize Endpoint
  e[:lock].synchronize do

    # Write as much as possible
    size = (s.write_nonblock e[:obuf] rescue nil) || 0
    e[:obuf].slice! 0, size if size > 0
  end
end