Class: Cql::Io::IoReactor

Inherits:
Object
  • Object
show all
Defined in:
lib/cql/io/io_reactor.rb

Overview

An IO reactor takes care of all the IO for a client. It handles opening new connections, and making sure that connections that have data to send flush to the network, and connections that have data coming in read that data and delegate it to their protocol handlers.

All IO is done in a single background thread, regardless of how many connections you open. There shouldn’t be any problems handling hundreds of connections if needed. All operations are thread safe, but you should take great care when in your protocol handlers to make sure that they don’t do too much work in their data handling callbacks, since those will be run in the reactor thread, and every cycle you use there is a cycle which can’t be used to handle IO.

The IO reactor is completely protocol agnostic, and it’s up to the specified protocol handler factory to create objects that can interpret the bytes received from remote hosts, and to send the correct commands back. The way this works is that when you create an IO reactor you provide a factory that can create protocol handler objects (this factory is most of the time just class, but it could potentially be any object that responds to #new). When you #connect a new protocol handler instance is created and passed a connection. The protocol handler can then register to receive data that arrives over the socket, and it can write data to the socket. It can also register to be notified when the socket is closed, or it can itself close the socket.

See Protocol::CqlProtocolHandler for an example of how the CQL protocol is implemented, and there is an integration tests that implements the Redis protocol that you can look at too.

Examples:

A protocol handler that processes whole lines


class LineProtocolHandler
  def initialize(connection, scheduler)
    @connection = connection
    # register a listener method for new data, this must be done in the
    # in the constructor, and only one listener can be registered
    @connection.on_data(&method(:process_data))
    @buffer = ''
  end

  def process_data(new_data)
    # in this fictional protocol we want to process whole lines, so we
    # append new data to our buffer and then loop as long as there is
    # a newline in the buffer, everything up until a newline is a
    # complete line
    @buffer << new_data
    while newline_index = @buffer.index("\n")
      line = @buffer.slice!(0, newline_index + 1)
      line.chomp!
      # Now do something interesting with the line, but remember that
      # while you're in the data listener method you're executing in the
      # IO reactor thread so you're blocking the reactor from doing
      # other IO work. You should not do any heavy lifting here, but
      # instead hand off the data to your application's other threads.
      # One way of doing that is to create a Cql::Future in the method
      # that sends the request, and then complete the future in this
      # method. How you keep track of which future belongs to which
      # reply is very protocol dependent so you'll have to figure that
      # out yourself.
    end
  end

  def send_request(command_string)
    # This example primarily shows how to implement a data listener
    # method, but this is how you write data to the connection. The
    # method can be called anything, it doesn't have to be #send_request
    @connection.write(command_string)
    # The connection object itself is threadsafe, but to create any
    # interesting protocol you probably need to set up some state for
    # each request so that you know which request to complete when you
    # get data back.
  end
end

Instance Method Summary collapse

Constructor Details

#initialize(protocol_handler_factory, options = {}) ⇒ IoReactor

Initializes a new IO reactor.

Parameters:

  • protocol_handler_factory (Object)

    a class that will be used create the protocol handler objects returned by #connect

  • options (Hash) (defaults to: {})

    only used to inject behaviour during tests



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cql/io/io_reactor.rb', line 90

def initialize(protocol_handler_factory, options={})
  @protocol_handler_factory = protocol_handler_factory
  @clock = options[:clock] || Time
  @unblocker = Unblocker.new
  @io_loop = IoLoopBody.new(options)
  @io_loop.add_socket(@unblocker)
  @running = false
  @stopped = false
  @started_promise = Promise.new
  @stopped_promise = Promise.new
  @lock = Mutex.new
end

Instance Method Details

#connect(host, port, timeout) ⇒ Cql::Future

Opens a connection to the specified host and port.

This method is asynchronous and returns a future which completes when the connection has been established, or fails if the connection cannot be established for some reason (the connection takes longer than the specified timeout, the remote host cannot be found, etc.).

The object returned in the future will be an instance of the protocol handler class you passed to #initialize.

Parameters:

  • host (String)

    the host to connect to

  • port (Integer)

    the port to connect to

  • timeout (Numeric)

    the number of seconds to wait for a connection before failing

Returns:

  • (Cql::Future)

    a future that will resolve to a protocol handler object that will be your interface to interact with the connection



184
185
186
187
188
189
190
191
# File 'lib/cql/io/io_reactor.rb', line 184

def connect(host, port, timeout)
  connection = Connection.new(host, port, timeout, @unblocker, @clock)
  f = connection.connect
  protocol_handler = @protocol_handler_factory.new(connection, self)
  @io_loop.add_socket(connection)
  @unblocker.unblock!
  f.map { protocol_handler }
end

#on_error {|error| ... } ⇒ Object

Register to receive notifications when the reactor shuts down because on an irrecoverable error.

The listener block will be called in the reactor thread. Any errors that it raises will be ignored.

Yields:

  • (error)

    the error that cause the reactor to stop



111
112
113
# File 'lib/cql/io/io_reactor.rb', line 111

def on_error(&listener)
  @stopped_promise.future.on_failure(&listener)
end

#running?Boolean

Returns true as long as the reactor is running. It will be true even after #stop has been called, but false when the future returned by #stop completes.

Returns:

  • (Boolean)


119
120
121
# File 'lib/cql/io/io_reactor.rb', line 119

def running?
  @running
end

#schedule_timer(timeout) ⇒ Cql::Future

Returns a future that completes after the specified number of seconds.

Parameters:

  • timeout (Float)

    the number of seconds to wait until the returned future is completed

Returns:

  • (Cql::Future)

    a future that completes when the timer expires



199
200
201
# File 'lib/cql/io/io_reactor.rb', line 199

def schedule_timer(timeout)
  @io_loop.schedule_timer(timeout)
end

#startCql::Future

Starts the reactor. This will spawn a background thread that will manage all connections.

This method is asynchronous and returns a future which completes when the reactor has started.

Returns:

  • (Cql::Future)

    a future that will resolve to the reactor itself



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/cql/io/io_reactor.rb', line 130

def start
  @lock.synchronize do
    raise ReactorError, 'Cannot start a stopped IO reactor' if @stopped
    return @started_promise.future if @running
    @running = true
  end
  Thread.start do
    @started_promise.fulfill(self)
    begin
      @io_loop.tick until @stopped
    ensure
      @io_loop.close_sockets
      @io_loop.cancel_timers
      @running = false
      if $!
        @stopped_promise.fail($!)
      else
        @stopped_promise.fulfill(self)
      end
    end
  end
  @started_promise.future
end

#stopCql::Future

Stops the reactor.

This method is asynchronous and returns a future which completes when the reactor has completely stopped, or fails with an error if the reactor stops or has already stopped because of a failure.

Returns:

  • (Cql::Future)

    a future that will resolve to the reactor itself



162
163
164
165
# File 'lib/cql/io/io_reactor.rb', line 162

def stop
  @stopped = true
  @stopped_promise.future
end

#to_sObject



203
204
205
# File 'lib/cql/io/io_reactor.rb', line 203

def to_s
  @io_loop.to_s
end