Class: Cql::Io::IoReactor

Inherits:
Object
  • Object
show all
Defined in:
lib/cql/io/io_reactor.rb

Overview

An instance of IO reactor manages the connections used by a client.

The reactor starts a thread in which all IO is performed. The IO reactor instances are thread safe.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ IoReactor

Returns a new instance of IoReactor.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :connection_timeout (Integer) — default: 5

    Max time to wait for a connection, in seconds



16
17
18
19
20
21
22
23
24
25
# File 'lib/cql/io/io_reactor.rb', line 16

def initialize(options={})
  @connection_timeout = options[:connection_timeout] || 5
  @lock = Mutex.new
  @command_queue = []
  @unblocker = UnblockerConnection.new(*IO.pipe)
  @connections = [@unblocker]
  @started_future = Future.new
  @stopped_future = Future.new
  @running = false
end

Instance Method Details

#add_connection(host, port) ⇒ Future<Object>

Establish a new connection.

Parameters:

  • host (String)

    The hostname to connect to

  • port (Integer)

    The port to connect to

Returns:

  • (Future<Object>)

    a future representing the ID of the newly established connection, or connection error if the connection fails.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/cql/io/io_reactor.rb', line 79

def add_connection(host, port)
  connection = NodeConnection.new(host, port, @connection_timeout)
  connection.on_close do
    @lock.synchronize do
      @connections.delete(connection)
      connection_commands, @command_queue = @command_queue.partition do |command|
        command.is_a?(TargetedRequestCommand) && command.connection_id == connection.connection_id
      end
      connection_commands.each do |command|
        command.future.fail!(ConnectionClosedError.new)
      end
    end
  end
  f = connection.open
  @lock.synchronize do
    @connections << connection
  end
  command_queue_push(nil)
  f
end

#add_event_listener {|event| ... } ⇒ Object

Registers a listener to receive server sent events.

Yield Parameters:



117
118
119
# File 'lib/cql/io/io_reactor.rb', line 117

def add_event_listener(&listener)
  command_queue_push(EventListenerCommand.new(listener))
end

#queue_request(request, connection_id = nil) ⇒ Future<ResultResponse>

Sends a request over a random, or specific connection.

Parameters:

  • request (Cql::Protocol::Request)

    the request to send

  • connection_id (Object) (defaults to: nil)

    the ID of the connection which should be used to send the request

Returns:

  • (Future<ResultResponse>)

    a future representing the result of the request



107
108
109
110
111
# File 'lib/cql/io/io_reactor.rb', line 107

def queue_request(request, connection_id=nil)
  command = connection_id ? TargetedRequestCommand.new(request, connection_id) : RequestCommand.new(request)
  command_queue_push(command)
  command.future
end

#running?Boolean

Returns whether or not the reactor is running

Returns:

  • (Boolean)


29
30
31
# File 'lib/cql/io/io_reactor.rb', line 29

def running?
  @running
end

#startFuture<nil>

Starts the reactor.

Calling this method when the reactor is connecting or is connected has no effect.

Returns:

  • (Future<nil>)

    a future which completes when the reactor has started



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/cql/io/io_reactor.rb', line 40

def start
  @lock.synchronize do
    unless @running
      @running = true
      @reactor_thread = Thread.start do
        begin
          @started_future.complete!
          io_loop
          @stopped_future.complete!
        rescue => e
          @stopped_future.fail!(e)
          raise
        end
      end
    end
  end
  @started_future
end

#stopFuture<nil>

Stops the reactor.

Calling this method when the reactor is stopping or has stopped has no effect.

Returns:

  • (Future<nil>)

    a future which completes when the reactor has stopped



66
67
68
69
70
# File 'lib/cql/io/io_reactor.rb', line 66

def stop
  @running = false
  command_queue_push(nil)
  @stopped_future
end