Class: Ione::Io::IoReactor

Inherits:
Object
  • Object
show all
Defined in:
lib/ione/io/io_reactor.rb

Overview

An IO reactor takes care of all the IO for a client. It handles opening new connections, and making sure that connections that have data to send flush to the network, and connections that have data coming in read that data and delegate it to their protocol handlers.

All IO is done in a single background thread, regardless of how many connections you open. There shouldn't be any problems handling hundreds of connections if needed. All operations are thread safe, but you should take great care when in your protocol handlers to make sure that they don't do too much work in their data handling callbacks, since those will be run in the reactor thread, and every cycle you use there is a cycle which can't be used to handle IO.

The IO reactor is completely protocol agnostic, and it's up to you to create objects that can interpret the bytes received from remote hosts, and to send the correct commands back. The way this works is that when you open a connection you can provide a protocol handler factory as a block, (or you can simply wrap the returned connection). This factory can be used to create objects that wrap the raw connections and register to receive new data, and it can write data to connection. It can also register to be notified when the socket is closed, or it can itself close the socket.

Examples:

A protocol handler that processes whole lines

io_reactor.connect('example.com', 6543, 10) do |connection|
  LineProtocolHandler.new(connection)
end

# ...

class LineProtocolHandler
  def initialize(connection)
    @connection = connection
    # register a listener method for new data, this must be done in the
    # in the constructor, and only one listener can be registered
    @connection.on_data(&method(:process_data))
    @buffer = ''
  end

  def process_data(new_data)
    # in this fictional protocol we want to process whole lines, so we
    # append new data to our buffer and then loop as long as there is
    # a newline in the buffer, everything up until a newline is a
    # complete line
    @buffer << new_data
    while newline_index = @buffer.index("\n")
      line = @buffer.slice!(0, newline_index + 1)
      line.chomp!
      # Now do something interesting with the line, but remember that
      # while you're in the data listener method you're executing in the
      # IO reactor thread so you're blocking the reactor from doing
      # other IO work. You should not do any heavy lifting here, but
      # instead hand off the data to your application's other threads.
      # One way of doing that is to create a Ione::Future in the method
      # that sends the request, and then complete the future in this
      # method. How you keep track of which future belongs to which
      # reply is very protocol dependent so you'll have to figure that
      # out yourself.
    end
  end

  def send_request(command_string)
    # This example primarily shows how to implement a data listener
    # method, but this is how you write data to the connection. The
    # method can be called anything, it doesn't have to be #send_request
    @connection.write(command_string)
    # The connection object itself is threadsafe, but to create any
    # interesting protocol you probably need to set up some state for
    # each request so that you know which request to complete when you
    # get data back.
  end
end

Since:

  • v1.0.0

Constant Summary collapse

PENDING_STATE =

Since:

  • v1.0.0

0
RUNNING_STATE =

Since:

  • v1.0.0

1
CRASHED_STATE =

Since:

  • v1.0.0

2
STOPPING_STATE =

Since:

  • v1.0.0

3
STOPPED_STATE =

Since:

  • v1.0.0

4

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ IoReactor

Initializes a new IO reactor.

Parameters:

  • options (Hash) (defaults to: {})

    only used to inject behaviour during tests

Since:

  • v1.0.0



93
94
95
96
97
98
99
100
101
102
# File 'lib/ione/io/io_reactor.rb', line 93

def initialize(options={})
  @options = options
  @clock = options[:clock] || Time
  @state = PENDING_STATE
  @error_listeners = []
  @unblocker = nil
  @io_loop = IoLoopBody.new(@options)
  @scheduler = Scheduler.new(@options)
  @lock = Mutex.new
end

Instance Method Details

#bind(host, port, options = nil) {|the| ... } ⇒ Ione::Future

Starts a server bound to the specified host and port.

A server is represented by an Acceptor, which wraps the server socket and accepts client connections. By registering to be notified on new connections, via Acceptor#on_accept, you can attach your server handling code to a connection.

Examples:

An echo server

acceptor_future = reactor.bind('0.0.0.0', 11111)
acceptor_future.on_value do |acceptor|
  acceptor.on_accept do |connection|
    connection.on_data do |data|
      connection.write(data)
    end
  end
end

A more realistic server template

class EchoServer
  def initialize(acceptor)
    @acceptor = acceptor
    @acceptor.on_accept do |connection|
      handle_connection(connection)
    end
  end

  def handle_connection(connection)
    connection.on_data do |data|
      connection.write(data)
    end
  end
end

server_future = reactor.bind('0.0.0.0', 11111) do |acceptor|
  EchoServer.new(acceptor)
end

server_future.on_value do |echo_server|
  # this is called when the server has started
end

Parameters:

  • host (String)

    the host to bind to, for example 127.0.0.1, 'example.com' – or '0.0.0.0' to bind to all interfaces

  • port (Integer)

    the port to bind to

  • options (Hash) (defaults to: nil)

Options Hash (options):

  • :backlog (Integer) — default: 5

    the maximum number of pending (unaccepted) connections, i.e. Socket::SOMAXCONN

  • :ssl (OpenSSL::SSL::SSLContext) — default: nil

    when specified the server will use this SSLContext to encrypt connections

Yield Parameters:

Returns:

  • (Ione::Future)

    a future that will resolve when the server is bound. The value will be the acceptor, or when a block is given, the value returned by the block.

Since:

  • v1.1.0



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/ione/io/io_reactor.rb', line 309

def bind(host, port, options=nil, &block)
  if options.is_a?(Integer) || options.nil?
    backlog = options || 5
    ssl_context = nil
  elsif options
    backlog = options[:backlog] || 5
    ssl_context = options[:ssl]
  end
  if ssl_context
    server = SslAcceptor.new(host, port, backlog, @unblocker, self, ssl_context)
  else
    server = Acceptor.new(host, port, backlog, @unblocker, self)
  end
  f = server.bind
  @io_loop.add_socket(server)
  @unblocker.unblock if running?
  f = f.map(&block) if block_given?
  f
end

#cancel_timer(timer_future) ⇒ Object

Cancels a previously scheduled timer.

The timer will fail with a CancelledError.

Parameters:

Since:

  • v1.1.3



350
351
352
# File 'lib/ione/io/io_reactor.rb', line 350

def cancel_timer(timer_future)
  @scheduler.cancel_timer(timer_future)
end

#connect(host, port, options = nil) {|connection| ... } ⇒ Ione::Future

Opens a connection to the specified host and port.

Examples:

A naive HTTP client

connection_future = reactor.connect('example.com', 80)
connection_future.on_value do |connection|
  connection.write("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
  connection.on_data do |data|
    print(data)
  end
end

Parameters:

  • host (String)

    the host to connect to

  • port (Integer)

    the port to connect to

  • options (Hash, Numeric) (defaults to: nil)

    a hash of options (see below) or the connection timeout (equivalent to using the :timeout option).

Options Hash (options):

  • :timeout (Numeric) — default: 5

    the number of seconds to wait for a connection before failing

  • :ssl (Boolean, OpenSSL::SSL::SSLContext) — default: false

    pass an OpenSSL::SSL::SSLContext to upgrade the connection to SSL, or true to upgrade the connection and create a new context.

Yield Parameters:

Returns:

  • (Ione::Future)

    a future that will resolve when the connection is open. The value will be the connection, or when a block is given the value returned by the block.

Since:

  • v1.0.0



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/ione/io/io_reactor.rb', line 228

def connect(host, port, options=nil, &block)
  if options.is_a?(Numeric) || options.nil?
    timeout = options || 5
    ssl = false
  elsif options
    timeout = options[:timeout] || 5
    ssl = options[:ssl]
  end
  connection = Connection.new(host, port, timeout, @unblocker, @clock)
  f = connection.connect
  @io_loop.add_socket(connection)
  @unblocker.unblock if running?
  if ssl
    f = f.flat_map do
      ssl_context = ssl == true ? nil : ssl
      upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, ssl_context)
      ff = upgraded_connection.connect
      @io_loop.remove_socket(connection)
      @io_loop.add_socket(upgraded_connection)
      @unblocker.unblock
      ff
    end
  end
  f = f.map(&block) if block_given?
  f
end

#on_error {|error| ... } ⇒ Object

Register to receive notifications when the reactor shuts down because of an irrecoverable error.

The listener block will be called in the reactor thread. Any errors that it raises will be ignored.

Yields:

  • (error)

    the error that cause the reactor to stop

Since:

  • v1.0.0



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ione/io/io_reactor.rb', line 111

def on_error(&listener)
  @lock.lock
  begin
    @error_listeners = @error_listeners.dup
    @error_listeners << listener
  ensure
    @lock.unlock
  end
  if @state == RUNNING_STATE || @state == CRASHED_STATE
    @stopped_promise.future.on_failure(&listener)
  end
end

#running?Boolean

Returns true as long as the reactor is running. It will be true even after #stop has been called, but false when the future returned by #stop completes.

Returns:

  • (Boolean)

Since:

  • v1.0.0



127
128
129
# File 'lib/ione/io/io_reactor.rb', line 127

def running?
  (state = @state) == RUNNING_STATE || state == STOPPING_STATE
end

#schedule_timer(timeout) ⇒ Ione::Future

Returns a future that completes after the specified number of seconds.

Parameters:

  • timeout (Float)

    the number of seconds to wait until the returned future is completed

Returns:

  • (Ione::Future)

    a future that completes when the timer expires

Since:

  • v1.0.0



340
341
342
# File 'lib/ione/io/io_reactor.rb', line 340

def schedule_timer(timeout)
  @scheduler.schedule_timer(timeout)
end

#startIone::Future

Starts the reactor. This will spawn a background thread that will manage all connections.

This method is asynchronous and returns a future which completes when the reactor has started.

Returns:

  • (Ione::Future)

    a future that will resolve to the reactor itself

Since:

  • v1.0.0



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/ione/io/io_reactor.rb', line 138

def start
  @lock.synchronize do
    if @state == RUNNING_STATE
      return @started_promise.future
    elsif @state == STOPPING_STATE
      return @stopped_promise.future.flat_map { start }.fallback { start }
    else
      @state = RUNNING_STATE
    end
  end
  @unblocker = Unblocker.new
  @io_loop.add_socket(@unblocker)
  @started_promise = Promise.new
  @stopped_promise = Promise.new
  @error_listeners.each do |listener|
    @stopped_promise.future.on_failure(&listener)
  end
  Thread.start do
    @started_promise.fulfill(self)
    error = nil
    begin
      while @state == RUNNING_STATE
        @io_loop.tick
        @scheduler.tick
      end
    rescue => e
      error = e
    ensure
      begin
        @io_loop.close_sockets
        @scheduler.cancel_timers
        @unblocker = nil
      ensure
        if error
          @state = CRASHED_STATE
          @stopped_promise.fail(error)
        else
          @state = STOPPED_STATE
          @stopped_promise.fulfill(self)
        end
      end
    end
  end
  @started_promise.future
end

#stopIone::Future

Stops the reactor.

This method is asynchronous and returns a future which completes when the reactor has completely stopped, or fails with an error if the reactor stops or has already stopped because of a failure.

Returns:

  • (Ione::Future)

    a future that will resolve to the reactor itself

Since:

  • v1.0.0



191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/ione/io/io_reactor.rb', line 191

def stop
  @lock.synchronize do
    if @state == PENDING_STATE
      Future.resolved(self)
    elsif @state != STOPPED_STATE && @state != CRASHED_STATE
      @state = STOPPING_STATE
      @stopped_promise.future
    else
      @stopped_promise.future
    end
  end
end

#to_sObject

Since:

  • v1.0.0



354
355
356
357
358
359
360
# File 'lib/ione/io/io_reactor.rb', line 354

def to_s
  state_constant_name = self.class.constants.find do |name|
    name.to_s.end_with?('_STATE') && self.class.const_get(name) == @state
  end
  state = state_constant_name.to_s.rpartition('_').first
  %(#<#{self.class.name} #{state}>)
end