RxIO

Reactive Sockets for Ruby

Presentation

This library is an implementation of the Reactor Pattern for Ruby Sockets. This allows easy development of fast, non-blocking services.

Installation

Gemfile

gem 'rxio'

Terminal

gem install -V rxio

Usage

Introduction

A simple service is created by spawning an instance of the RxIO::Service class, passing a Handler Module as argument to its constructor and calling its run method.

A simple client is created by spawning an instance of the RxIO::Client class, passing a Handler Module as argument to its constructor and calling its run method.

This will execute the service / client main loop. The handler module provides the specific protocol implementation through a set of methods.

The run method on both Service & Client is blocking and will only return after the main loop has terminated. A stop method is also provided to request the main loop to terminate.

SSL Support

RxIO provides support for secure communication through OpenSSL. To create a secure server, pass the ssl parameters to RxIO::Service.new:

s = RxIO::Service.new 'localhost', 4444, ExampleServiceHandler, { cert: './certificate.pem', pkey: './private.key' }
s.startup

Creating a secure client is as simple as:

c = RxIO::Client.new 'localhost', 4444, TestClientHandler, true
c.startup

SSL Params

When creating a secure client, the last argument to RxIO::Client.new is a hash through which the following can be specified:

  • verify_peer - true/false (Verify the Peer Certificate)

Ignoring SSL Certificate Errors

Although strongly discouraged for obvious security reasons, the SSL Peer Certificate verification can be disabled:

c = RxIO::Client.new 'localhost', port, TestClientHandler, true, verify_peer: false
c.startup

Running in the background

While the run / stop methods offer a simple way to implement a single service/client within the current thread, some situations may require wrapping in a separate thread.

Thank to Runify, a dedicated interface is provided for exactly this purpose: startup / shutdown / restart

startup

Calling this method will spawn a new thread around the run method, effectively starting the service / client in the 'background'.

shutdown

Calling shutdown will request the service / client to terminate and wait for the wrapper thread to complete (join). Once this method returns, the service / client has completely terminated (thread dead, all sockets closed, all clients disconnected).

restart

This method is a simple shortcut to shutdown immediately followed by startup.

Handler Interface

The following is a list of methods that should be implemented by the handler module in order to provide a valid interface to the RxIO::Service and RxIO::Client classes that will use it.

filter_input endpoint, chunk

This method is called any time a chunk of data is received on the wire. Its purpose is usually to filter protocol data and re-assemble messages. Complete messages can be enqueued for processing by pushing them into the endpoint[:msgs] array.

handle_msg endpoint, msg

Messages in the endpoint[:msgs] array are regularly de-queued and passed to the handle_msg method for handling. This is usually the entry point to most of the business logic for services.

(OPTIONAL) on_join endpoint

As soon as a endpoint is 'online' (Client is registered by the Service / Connection to Server is established by the Client), it is passed as argument to the on_join method of the handler module (if available). This allows the handler module to perform any necessary initialization tasks related to this endpoint.

(OPTIONAL) on_drop endpoint

Called by the service / client whenever the connection to the server / a client has been lost (if available). This method should release any resources used by the client.

(OPTIONAL) send_msg endpoint, msg

This method should encapsulate the message supplied as argument according to the desired protocol, and then add the result to the output buffer. While not absolutely required by the service or client classes, this method should be considered a best-practice for users to encapsulate data according to a protocol. Also, if an implementation for send_msg is provided in the Service Handler, Clients built around it will expose a shortcut 'send_msg' instance method.

Handler Base

A base module is provided to facilitate implementation of new protocols. The RxIO::HandlerBase module should be extended by the service handler module. It provides two simple I/O methods:

write endpoint, *data

Used to send one or more data chunks to an endpoint. This method pushes the chunk(s) to the output buffer, to be later sent over the wire.

This method is thread-safe - while in most cases it will be called from within the main service / client loop (often as a consequence of service logic in handle_msg), it is perfectly safe to call from any other thread. This is how most clients should behave. This can also be useful in situations where a service might generate messages for the client outside of the normal request-response cycle (such as an event-subscription service).

Note: this method does not perform any encoding or transformation on the data. It is the user's responsibility to provide protocol encoding, usually through the send_msg method.

buffer_input, endpoint, chunk

Used to add a data chunk to the endpoint's input buffer.

The Peer Hash

Most of RxIO works around a peer hash, which is used to represent information about a remote peer. This hash is built by RxIO when connecting to a service, or when accepting a client's connection request.

The hash contains a lot of information but only two elements are really important for the user:

  • :peer - A hash containing name, addr, and port of the remote peer
  • :local - A reference to the Service / Client instance on the local end.

Dropping Clients

To properly drop a client (ensuring that any buffered data is flushed out), simply set its :drop attribute to anything truthy:

# Foo Service Handler Module
module FooHandler

    # Extend BinDelim I/O Filter
    extend RxIO::IOFilters::BinDelim

    # Set Message Delimiter
    msg_delim "\n"

    # Handle Message
    def handle_msg client, msg

        # Echo message and immediately drop client
        send_msg client, msg
        client[:drop] = true
    end
end

Switching targets (for Client)

When working with a service client, sometimes it may be required to disconnect the client and re-connect it to another server.

This can be done using the set_target method, which updates the addr and port attributes of the client.

# Use foo.example.com:4432 as target for next connection
client.set_target 'foo.example.com', 4432

The next time a connection is established, it will use the new target.

When running in the background (through Runify), this can be immediately forced by restarting the client.

# Use foo.example.com:4432 as target for next connection
client.set_target 'foo.example.com', 4432

# Immediately enforce the use of the new target
client.restart

When running in single-thread mode (through the client's 'run' method), this can be immediately forced by stopping the client and then restarting it.

Provided Filters

Some generic filters are provided as part of the RxIO::IOFilters module. These can be extended directly in a custom service handler module to provide immediate handling of a variety of protocol types. Most of the provided filters usually provide the filter_input and send_msg methods.

Binary-Delimiter (BinDelim)

The binary-delimiter filter is the simplest of all filters, splitting messages according to a fixed binary string. After extending the RxIO::IOFilters::BinDelim module, the msg_delim method should be called with the desired binary string as argument.

Example
require 'rxio'

# Custom Service 'Foo'
class FooService < RxIO::Service

    # Construct
    def initialize addr, port
        super addr, port, FooHandler
    end

    # Foo Service Handler Module
    module FooHandler

        # Extend BinDelim I/O Filter
        extend RxIO::IOFilters::BinDelim

        # Set Message Delimiter
        msg_delim "\n"

        # ...
    end
end

Message-Size (MsgSize)

the message-size filter is another very basic generic protocol filter, splitting messages according to a 4-byte unsigned big-endian integer size field, which prefixes every message and indicates its length in bytes.

Example
require 'rxio'

# Custom Service 'Foo'
class FooService < RxIO::Service

    # Construct
    def initialize addr, port
        super addr, port, FooHandler
    end

    # Foo Service Handler Module
    module FooHandler

        # Extend MsgSize I/O Filter
        extend RxIO::IOFilters::MsgSize

        # ...
    end
end

Examples

A simple echo service

#!/usr/bin/env ruby

require 'rxio'

# Echo Service Class
class EchoService < RxIO::Service

    # Defaults
    DEFAULT_ADDR = '0.0.0.0'
    DEFAULT_PORT = 4444

    # Construct
    def initialize
        super DEFAULT_ADDR, DEFAULT_PORT, EchoHandler
    end

    # Echo Service Handler Module
    module EchoHandler

        # Extend BinDelim I/O Filter
        extend RxIO::IOFilters::BinDelim

        # Set Message Delimiter
        msg_delim "\n"

        # On Join
        def self.on_join client
            puts "Client connected: #{client[:peer][:name]}:#{client[:peer][:port]}"
        end

        # On Drop
        def self.on_drop client
            puts "Client dropped: #{client[:peer][:name]}:#{client[:peer][:port]}"
        end

        # Handle Message
        def self.handle_msg client, msg
            msg.chomp!              # Clean spurious \r
            puts "Got message from client: #{client[:peer][:name]}:#{client[:peer][:port]} -> \"#{msg}\""
            send_msg client, msg    # Echo message back to client
        end
    end
end

# Run the service
es = EchoService.new
es.run

A simple client for the echo service

#!/usr/bin/env ruby

require 'rxio'

# Echo Client Class
class EchoClient < RxIO::Client

    # Defaults
    DEFAULT_ADDR = '0.0.0.0'
    DEFAULT_PORT = 4444

    # Construct
    def initialize
        super DEFAULT_ADDR, DEFAULT_PORT, EchoHandler
    end

    # Echo Service Handler Module
    module EchoHandler

        # Extend BinDelim I/O Filter
        extend RxIO::IOFilters::BinDelim

        # Set Message Delimiter
        msg_delim "\n"

        # On Join
        def self.on_join server
            puts "Connected to server: #{server[:peer][:name]}:#{server[:peer][:port]}"
            send_msg server, "Hello World!"
        end

        # On Drop
        def self.on_drop server
            puts "Connection dropped: #{server[:peer][:name]}:#{server[:peer][:port]}"
        end

        # Handle Message
        def self.handle_msg server, msg
            puts "Got response from server: #{server[:peer][:name]}:#{server[:peer][:port]} -> \"#{msg}\""
            server[:client].stop        # Done - Terminate the Client!
        end
    end
end

# Run the Client
ec = EchoClient.new
ec.run

Running a service in the background

# ...

# Run the service in the background
puts 'Service is starting up...'
es = EchoService.new
es.startup
puts 'Service is online!'

# Do something while the service is running...

# Shutdown the service
es.shutdown
puts 'Service has terminated!'

Running a client in the background

# ...

# Run the client in the background
puts 'Client is starting up...'
es = EchoService.new
es.startup
puts 'Client is online!'

# Do something while the client is running...

# Shutdown the client
es.shutdown
puts 'Client has terminated!'

Dealing with Peer Errors

Whenever an error occurs as the result of handling a peer's message, it should be notified through the peer_error method in RxIO::IOBase (included in RxIO::Service). The default implementation of this method simply drops the peer. It can be overloaded to provide custom functionality such as proper logging or more complex logic.

# Default implementation for 'peer_error'
def peer_error p, e
    drop_endpoint p
end

Usage:

While the actual 'peer_error' method can be publicly called on any class that includes the RxIO::IOBase Module, it is normally not necessary. In fact, calls to both the filter_input & handle_msg callback methods (to be implemented in the Service Handler Module) are wrapped within a begin / rescue block which will call peer_error for any exception raised during either parsing or processing of messages.

#!/usr/bin/env ruby

require 'rxio'

# Echo Service Class
class EchoService < RxIO::Service

    # ...

    # Echo Service Handler Module
    module EchoHandler

        # ...

        # Handle Message
        def self.handle_msg client, msg

            # Everything is an error
            raise 'Invalid Message Structure'

            # Equivalent to:
            # return client[:serv].peer_error client, msg
        end
    end
end

# Run the service
es = EchoService.new
es.run

License

The gem is available as open source under the terms of the MIT License.