RxIO
Reactive Sockets for Ruby
Presentation
This library is an implementation of the Reactor Pattern for Ruby Sockets. This allows easy development of fast, non-blocking services.
Installation
Gemfile
gem 'rxio'
Terminal
gem install -V rxio
Usage
A simple service is created by spawning an instance of the RxIO::Service class, passing a Handler Module as argument to its constructor and calling its run method.
This will execute the service's main loop. The handler module provides the specific protocol implementation through a set of methods.
The run method is blocking and will only return after the service has terminated.
A stop method is also provided to request the service to terminate.
Running in the background
While the run / stop methods offer a simple way to implement a single service within the current thread, some situations may require the wrapping of the service in a separate thread.
A dedicated interface is provided for exactly this: startup / shutdown
startup
Calling this method will spawn a new thread around the run method, effectively starting the service in the 'background'.
shutdown
Calling shutdown will request the service to terminate (via the stop method) and wait for the thread to complete (join). Once this method returns, the service has completely terminated (thread dead, all clients disconnected).
Handler Interface
The following is a list of methods that should be implemented by the handler module in order to provide a valid interface to the RxIO::Service class that will use it.
filter_input client, chunk
This method is called any time a chunk of data is received from a client. Its purpose is usually to filter protocol data and re-assemble messages. Complete messages can be enqueued for processing by pushing them into the client[:msgs] array.
handle_msg client, msg
Messages in the client[:msgs] array are regularly de-queued and passed to the handle_msg method for handling. This is usually the entry point to most of the service logic.
(OPTIONAL) on_join client
As soon as a client is registered by the service, it is passed as argument to the on_join method of the handler module (if available). This allows the handler module to perform any necessary initialization tasks related to this client.
(OPTIONAL) on_drop client
Called by the service whenever a client is about to be disconnected (if available). This method should release any resources used by the client.
(OPTIONAL) send_msg client, msg
This method should wrap the message supplied as argument according to the desired protocol, and then add the result to the output buffer. While not actually required by the service class, this method should be considered a best-practice for users to encapsulate data according to a protocol.
Handler Base
A base module is provided to facilitate implementation of new services and protocols. The RxIO::HandlerBase module should be extended by the service handler module. It provides two simple I/O methods:
write client, *data
Used to send one or more data chunks to a client. This method pushes the chunk(s) to the output buffer, to be later sent to the client.
This method is thread-safe - while in most cases it will be called from within the main service loop (generally as a consequence of service logic in handle_msg), it is perfectly safe to call from any other thread. This can be useful in situations where a service might generate messages for the client outside of the normal request-response cycle (such as an event-subscription service).
Note: this method does not perform any encoding or transformation on the data. It is the user's responsibility to provide protocol encoding, usually through the send_msg method.
buffer_input, client, chunk
Used to add a data chunk to the client's input buffer.
Provided Filters
Some generic filters are provided as part of the RxIO::IOFilters module. These can be extended directly in a custom service handler module to provide immediate handling of a variety of protocol types. Most of the provided filters usually provide the filter_input and send_msg methods.
Binary-Delimiter (BinDelim)
The binary-delimiter filter is the simplest of all filters, splitting messages according to a fixed binary string. After extending the RxIO::IOFilters::BinDelim module, the msg_delim method should be called with the desired binary string as argument.
Example
require 'rxio'
# Custom Service 'Foo'
class FooService < RxIO::Service
# Construct
def initialize addr, port
super addr, port, FooHandler
end
# Foo Service Handler Module
module FooHandler
# Extend BinDelim I/O Filter
extend RxIO::IOFilters::BinDelim
# Set Message Delimiter
msg_delim "\n"
# ...
end
end
Message-Size (MsgSize)
the message-size filter is another very basic generic protocol filter, splitting messages according to a 4-byte unsigned big-endian integer size field, which prefixes every message and indicates its length in bytes.
Example
require 'rxio'
# Custom Service 'Foo'
class FooService < RxIO::Service
# Construct
def initialize addr, port
super addr, port, FooHandler
end
# Foo Service Handler Module
module FooHandler
# Extend MsgSize I/O Filter
extend RxIO::IOFilters::MsgSize
# ...
end
end
Examples
A simple echo service
#!/usr/bin/env ruby
require 'rxio'
# Echo Service Class
class EchoService < RxIO::Service
# Defaults
DEFAULT_ADDR = '0.0.0.0'
DEFAULT_PORT = 4444
# Construct
def initialize
super DEFAULT_ADDR, DEFAULT_PORT, EchoHandler
end
# Echo Service Handler Module
module EchoHandler
# Extend BinDelim I/O Filter
extend RxIO::IOFilters::BinDelim
# Set Message Delimiter
msg_delim "\n"
# On Join
def self.on_join client
puts "Client connected: #{client[:peer][:name]}:#{client[:peer][:port]}"
end
# On Drop
def self.on_drop client
puts "Client dropped: #{client[:peer][:name]}:#{client[:peer][:port]}"
end
# Handle Message
def self.handle_msg client, msg
msg.chomp! # Clean spurious \r
puts "Got message from client: #{client[:peer][:name]}:#{client[:peer][:port]} -> \"#{msg}\""
send_msg client, msg # Echo message back to client
end
end
end
# Run the service
es = EchoService.new
es.run
Running a service in the background
# ...
# Run the service in the background
puts 'Service is starting up...'
es = EchoService.new
es.startup
puts 'Service is online!'
# Do something while the service is running...
# Shutdown the service
es.shutdown
puts 'Service has terminated!'
License
The gem is available as open source under the terms of the MIT License.