Class: Ione::Io::IoReactor
- Inherits:
-
Object
- Object
- Ione::Io::IoReactor
- Defined in:
- lib/ione/io/io_reactor.rb
Overview
An IO reactor takes care of all the IO for a client. It handles opening new connections, and making sure that connections that have data to send flush to the network, and connections that have data coming in read that data and delegate it to their protocol handlers.
All IO is done in a single background thread, regardless of how many connections you open. There shouldn't be any problems handling hundreds of connections if needed. All operations are thread safe, but you should take great care when in your protocol handlers to make sure that they don't do too much work in their data handling callbacks, since those will be run in the reactor thread, and every cycle you use there is a cycle which can't be used to handle IO.
The IO reactor is completely protocol agnostic, and it's up to you to create objects that can interpret the bytes received from remote hosts, and to send the correct commands back. The way this works is that when you open a connection you can provide a protocol handler factory as a block, (or you can simply wrap the returned connection). This factory can be used to create objects that wrap the raw connections and register to receive new data, and it can write data to connection. It can also register to be notified when the socket is closed, or it can itself close the socket.
Constant Summary collapse
- PENDING_STATE =
0- RUNNING_STATE =
1- CRASHED_STATE =
2- STOPPING_STATE =
3- STOPPED_STATE =
4
Instance Method Summary collapse
-
#bind(host, port, options = nil) {|the| ... } ⇒ Ione::Future
Starts a server bound to the specified host and port.
-
#cancel_timer(timer_future) ⇒ Object
Cancels a previously scheduled timer.
-
#connect(host, port, options = nil) {|connection| ... } ⇒ Ione::Future
Opens a connection to the specified host and port.
-
#initialize(options = {}) ⇒ IoReactor
constructor
Initializes a new IO reactor.
-
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because of an irrecoverable error.
-
#running? ⇒ Boolean
Returns true as long as the reactor is running.
-
#schedule_timer(timeout) ⇒ Ione::Future
Returns a future that completes after the specified number of seconds.
-
#start ⇒ Ione::Future
Starts the reactor.
-
#stop ⇒ Ione::Future
Stops the reactor.
- #to_s ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ IoReactor
Initializes a new IO reactor.
93 94 95 96 97 98 99 100 101 |
# File 'lib/ione/io/io_reactor.rb', line 93 def initialize(={}) = @clock = [:clock] || Time @state = PENDING_STATE @error_listeners = [] @io_loop = IoLoopBody.new() @scheduler = Scheduler.new @lock = Mutex.new end |
Instance Method Details
#bind(host, port, options = nil) {|the| ... } ⇒ Ione::Future
Starts a server bound to the specified host and port.
A server is represented by an Acceptor, which wraps the server socket and accepts client connections. By registering to be notified on new connections, via Acceptor#on_accept, you can attach your server handling code to a connection.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/ione/io/io_reactor.rb', line 308 def bind(host, port, =nil, &block) if .is_a?(Integer) || .nil? backlog = || 5 ssl_context = nil elsif backlog = [:backlog] || 5 ssl_context = [:ssl] end if ssl_context server = SslAcceptor.new(host, port, backlog, @unblocker, self, ssl_context) else server = Acceptor.new(host, port, backlog, @unblocker, self) end f = server.bind @io_loop.add_socket(server) @unblocker.unblock if running? f = f.map(&block) if block_given? f end |
#cancel_timer(timer_future) ⇒ Object
Cancels a previously scheduled timer.
The timer will fail with a CancelledError.
349 350 351 |
# File 'lib/ione/io/io_reactor.rb', line 349 def cancel_timer(timer_future) @scheduler.cancel_timer(timer_future) end |
#connect(host, port, options = nil) {|connection| ... } ⇒ Ione::Future
Opens a connection to the specified host and port.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/ione/io/io_reactor.rb', line 227 def connect(host, port, =nil, &block) if .is_a?(Numeric) || .nil? timeout = || 5 ssl = false elsif timeout = [:timeout] || 5 ssl = [:ssl] end connection = Connection.new(host, port, timeout, @unblocker, @clock) f = connection.connect @io_loop.add_socket(connection) @unblocker.unblock if running? if ssl f = f.flat_map do ssl_context = ssl == true ? nil : ssl upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, ssl_context) ff = upgraded_connection.connect @io_loop.remove_socket(connection) @io_loop.add_socket(upgraded_connection) @unblocker.unblock ff end end f = f.map(&block) if block_given? f end |
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because of an irrecoverable error.
The listener block will be called in the reactor thread. Any errors that it raises will be ignored.
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ione/io/io_reactor.rb', line 110 def on_error(&listener) @lock.lock begin @error_listeners = @error_listeners.dup @error_listeners << listener ensure @lock.unlock end if @state == RUNNING_STATE || @state == CRASHED_STATE @stopped_promise.future.on_failure(&listener) end end |
#running? ⇒ Boolean
126 127 128 |
# File 'lib/ione/io/io_reactor.rb', line 126 def running? (state = @state) == RUNNING_STATE || state == STOPPING_STATE end |
#schedule_timer(timeout) ⇒ Ione::Future
Returns a future that completes after the specified number of seconds.
339 340 341 |
# File 'lib/ione/io/io_reactor.rb', line 339 def schedule_timer(timeout) @scheduler.schedule_timer(timeout) end |
#start ⇒ Ione::Future
Starts the reactor. This will spawn a background thread that will manage all connections.
This method is asynchronous and returns a future which completes when the reactor has started.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/ione/io/io_reactor.rb', line 137 def start @lock.synchronize do if @state == RUNNING_STATE return @started_promise.future elsif @state == STOPPING_STATE return @stopped_promise.future.flat_map { start }.fallback { start } else @state = RUNNING_STATE end end @unblocker = Unblocker.new @io_loop.add_socket(@unblocker) @started_promise = Promise.new @stopped_promise = Promise.new @error_listeners.each do |listener| @stopped_promise.future.on_failure(&listener) end Thread.start do @started_promise.fulfill(self) error = nil begin while @state == RUNNING_STATE @io_loop.tick @scheduler.tick end rescue => e error = e ensure begin @io_loop.close_sockets @scheduler.cancel_timers @unblocker = nil ensure if error @state = CRASHED_STATE @stopped_promise.fail(error) else @state = STOPPED_STATE @stopped_promise.fulfill(self) end end end end @started_promise.future end |
#stop ⇒ Ione::Future
Stops the reactor.
This method is asynchronous and returns a future which completes when the reactor has completely stopped, or fails with an error if the reactor stops or has already stopped because of a failure.
190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/ione/io/io_reactor.rb', line 190 def stop @lock.synchronize do if @state == PENDING_STATE Future.resolved(self) elsif @state != STOPPED_STATE && @state != CRASHED_STATE @state = STOPPING_STATE @stopped_promise.future else @stopped_promise.future end end end |
#to_s ⇒ Object
353 354 355 356 357 358 359 |
# File 'lib/ione/io/io_reactor.rb', line 353 def to_s state_constant_name = self.class.constants.find do |name| name.to_s.end_with?('_STATE') && self.class.const_get(name) == @state end state = state_constant_name.to_s.rpartition('_').first %(#<#{self.class.name} #{state}>) end |