Module: RxIO::IOBase

Included in:
Client, Service
Defined in:
lib/rxio/io_base.rb

Overview

I/O Base Module

Constant Summary collapse

CHUNK_SIZE =

Chunk Size

1024
RETRY_EXCEPTIONS =

Retry Exceptions

[
	OpenSSL::SSL::SSLErrorWaitReadable,
	IO::EAGAINWaitReadable
]

Instance Method Summary collapse

Instance Method Details

#drop_endpoint(endpoint) ⇒ Object

Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint.

Parameters:

  • endpoint (Hash)


103
104
105
106
107
108
109
110
# File 'lib/rxio/io_base.rb', line 103

def drop_endpoint endpoint

	# Notify Service Handler
	@service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop

	# Drop Endpoint
	on_drop endpoint
end

#peer_error(p, _e) ⇒ Object

Peer Error: Handles an Error from a Peer.

Parameters:

  • p (Hash)

    Endpoint Hash

  • e (Exception)

    Exception



116
117
118
# File 'lib/rxio/io_base.rb', line 116

def peer_error p, _e
	drop_endpoint p
end

#process_input(endpoint, chunk) ⇒ Object

Process Input: Processes Input for an Endpoint, in the form of a data chunk.

Parameters:

  • endpoint (Hash)
  • chunk (String)

    A chunk of data, as received by the socket



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rxio/io_base.rb', line 26

def process_input endpoint, chunk

	# Begin
	begin

		# Pass through Service Handler Module
		@service_handler.filter_input endpoint, chunk

		# Process Messages
		@service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty?

		# Sub-Process Input
		@service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input

		# Rescue
	rescue Exception => e

		# Peer Error
		peer_error endpoint, e
	end
end

#read_sock(s) ⇒ Object

Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input.

Parameters:

  • s (TCPSocket)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rxio/io_base.rb', line 51

def read_sock s

	# Acquire Endpoint for Socket
	e = get_endpoint_for_sock s

	# Check Endpoint
	return unless e

	# OpenSSL doesn't play nice with 'select' because of intermediate buffering.
	# Therefore we loop until we either have an explicit error or a signal to wait.
	done = false
	chunk = ''
	rdstr = ''
	until done || !rdstr
		begin
			rdstr = s.read_nonblock(CHUNK_SIZE)
			chunk << rdstr if rdstr
		rescue Exception => ex
			if RETRY_EXCEPTIONS.include? ex.class; then done = true; else rdstr = nil; end
		end
	end

	# Drop Endpoint & Abort on Error
	return drop_endpoint e unless rdstr

	# Process Input
	process_input e, chunk
end

#write_sock(s) ⇒ Object

Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client’s output buffer.

Parameters:

  • s (TCPSocket)


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rxio/io_base.rb', line 83

def write_sock s

	# Acquire Endpoint
	e = get_endpoint_for_sock s

	# Check Endpoint
	return unless e

	# Synchronize Endpoint
	e[:lock].synchronize do

		# Write as much as possible
		size = (s.write_nonblock e[:obuf] rescue nil) || 0
		e[:obuf].slice!(0, size) if size > 0
	end
end