Class: Cql::Io::IoReactor
- Inherits:
-
Object
- Object
- Cql::Io::IoReactor
- Defined in:
- lib/cql/io/io_reactor.rb
Overview
An IO reactor takes care of all the IO for a client. It handles opening new connections, and making sure that connections that have data to send flush to the network, and connections that have data coming in read that data and delegate it to their protocol handlers.
All IO is done in a single background thread, regardless of how many connections you open. There shouldn’t be any problems handling hundreds of connections if needed. All operations are thread safe, but you should take great care when in your protocol handlers to make sure that they don’t do too much work in their data handling callbacks, since those will be run in the reactor thread, and every cycle you use there is a cycle which can’t be used to handle IO.
The IO reactor is completely protocol agnostic, and it’s up to the specified protocol handler factory to create objects that can interpret the bytes received from remote hosts, and to send the correct commands back. The way this works is that when you create an IO reactor you provide a factory that can create protocol handler objects (this factory is most of the time just class, but it could potentially be any object that responds to #new). When you #connect a new protocol handler instance is created and passed a connection. The protocol handler can then register to receive data that arrives over the socket, and it can write data to the socket. It can also register to be notified when the socket is closed, or it can itself close the socket.
See Protocol::CqlProtocolHandler for an example of how the CQL protocol is implemented, and there is an integration tests that implements the Redis protocol that you can look at too.
Instance Method Summary collapse
-
#connect(host, port, timeout) ⇒ Cql::Future
Opens a connection to the specified host and port.
-
#initialize(protocol_handler_factory, options = {}) ⇒ IoReactor
constructor
Initializes a new IO reactor.
-
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because on an irrecoverable error.
-
#running? ⇒ Boolean
Returns true as long as the reactor is running.
-
#schedule_timer(timeout) ⇒ Cql::Future
Returns a future that completes after the specified number of seconds.
-
#start ⇒ Cql::Future
Starts the reactor.
-
#stop ⇒ Cql::Future
Stops the reactor.
- #to_s ⇒ Object
Constructor Details
#initialize(protocol_handler_factory, options = {}) ⇒ IoReactor
Initializes a new IO reactor.
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/cql/io/io_reactor.rb', line 90 def initialize(protocol_handler_factory, ={}) @protocol_handler_factory = protocol_handler_factory @clock = [:clock] || Time @unblocker = Unblocker.new @io_loop = IoLoopBody.new() @io_loop.add_socket(@unblocker) @running = false @stopped = false @started_promise = Promise.new @stopped_promise = Promise.new @lock = Mutex.new end |
Instance Method Details
#connect(host, port, timeout) ⇒ Cql::Future
Opens a connection to the specified host and port.
This method is asynchronous and returns a future which completes when the connection has been established, or fails if the connection cannot be established for some reason (the connection takes longer than the specified timeout, the remote host cannot be found, etc.).
The object returned in the future will be an instance of the protocol handler class you passed to #initialize.
184 185 186 187 188 189 190 191 |
# File 'lib/cql/io/io_reactor.rb', line 184 def connect(host, port, timeout) connection = Connection.new(host, port, timeout, @unblocker, @clock) f = connection.connect protocol_handler = @protocol_handler_factory.new(connection, self) @io_loop.add_socket(connection) @unblocker.unblock! f.map { protocol_handler } end |
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because on an irrecoverable error.
The listener block will be called in the reactor thread. Any errors that it raises will be ignored.
111 112 113 |
# File 'lib/cql/io/io_reactor.rb', line 111 def on_error(&listener) @stopped_promise.future.on_failure(&listener) end |
#running? ⇒ Boolean
Returns true as long as the reactor is running. It will be true even after #stop has been called, but false when the future returned by #stop completes.
119 120 121 |
# File 'lib/cql/io/io_reactor.rb', line 119 def running? @running end |
#schedule_timer(timeout) ⇒ Cql::Future
Returns a future that completes after the specified number of seconds.
199 200 201 |
# File 'lib/cql/io/io_reactor.rb', line 199 def schedule_timer(timeout) @io_loop.schedule_timer(timeout) end |
#start ⇒ Cql::Future
Starts the reactor. This will spawn a background thread that will manage all connections.
This method is asynchronous and returns a future which completes when the reactor has started.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/cql/io/io_reactor.rb', line 130 def start @lock.synchronize do raise ReactorError, 'Cannot start a stopped IO reactor' if @stopped return @started_promise.future if @running @running = true end Thread.start do @started_promise.fulfill(self) begin @io_loop.tick until @stopped ensure @io_loop.close_sockets @io_loop.cancel_timers @running = false if $! @stopped_promise.fail($!) else @stopped_promise.fulfill(self) end end end @started_promise.future end |
#stop ⇒ Cql::Future
Stops the reactor.
This method is asynchronous and returns a future which completes when the reactor has completely stopped, or fails with an error if the reactor stops or has already stopped because of a failure.
162 163 164 165 |
# File 'lib/cql/io/io_reactor.rb', line 162 def stop @stopped = true @stopped_promise.future end |
#to_s ⇒ Object
203 204 205 |
# File 'lib/cql/io/io_reactor.rb', line 203 def to_s @io_loop.to_s end |