Class: Cql::Io::IoReactor
- Inherits:
-
Object
- Object
- Cql::Io::IoReactor
- Defined in:
- lib/cql/io/io_reactor.rb
Overview
An instance of IO reactor manages the connections used by a client.
The reactor starts a thread in which all IO is performed. The IO reactor instances are thread safe.
Instance Method Summary collapse
-
#add_connection(host, port) ⇒ Future<Object>
Establish a new connection.
-
#add_event_listener {|event| ... } ⇒ Object
Registers a listener to receive server sent events.
-
#initialize(options = {}) ⇒ IoReactor
constructor
A new instance of IoReactor.
-
#queue_request(request, connection_id = nil) ⇒ Future<ResultResponse>
Sends a request over a random, or specific connection.
-
#running? ⇒ Boolean
Returns whether or not the reactor is running.
-
#start ⇒ Future<nil>
Starts the reactor.
-
#stop ⇒ Future<nil>
Stops the reactor.
Constructor Details
#initialize(options = {}) ⇒ IoReactor
Returns a new instance of IoReactor.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/cql/io/io_reactor.rb', line 16 def initialize(={}) @connection_timeout = [:connection_timeout] || 5 @lock = Mutex.new @command_queue = [] @unblocker = UnblockerConnection.new(*IO.pipe) @connections = [@unblocker] @started_future = Future.new @stopped_future = Future.new @running = false end |
Instance Method Details
#add_connection(host, port) ⇒ Future<Object>
Establish a new connection.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/cql/io/io_reactor.rb', line 79 def add_connection(host, port) connection = NodeConnection.new(host, port, @connection_timeout) connection.on_close do @lock.synchronize do @connections.delete(connection) connection_commands, @command_queue = @command_queue.partition do |command| command.is_a?(TargetedRequestCommand) && command.connection_id == connection.connection_id end connection_commands.each do |command| command.future.fail!(ConnectionClosedError.new) end end end f = connection.open @lock.synchronize do @connections << connection end command_queue_push(nil) f end |
#add_event_listener {|event| ... } ⇒ Object
Registers a listener to receive server sent events.
117 118 119 |
# File 'lib/cql/io/io_reactor.rb', line 117 def add_event_listener(&listener) command_queue_push(EventListenerCommand.new(listener)) end |
#queue_request(request, connection_id = nil) ⇒ Future<ResultResponse>
Sends a request over a random, or specific connection.
107 108 109 110 111 |
# File 'lib/cql/io/io_reactor.rb', line 107 def queue_request(request, connection_id=nil) command = connection_id ? TargetedRequestCommand.new(request, connection_id) : RequestCommand.new(request) command_queue_push(command) command.future end |
#running? ⇒ Boolean
Returns whether or not the reactor is running
29 30 31 |
# File 'lib/cql/io/io_reactor.rb', line 29 def running? @running end |
#start ⇒ Future<nil>
Starts the reactor.
Calling this method when the reactor is connecting or is connected has no effect.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/cql/io/io_reactor.rb', line 40 def start @lock.synchronize do unless @running @running = true @reactor_thread = Thread.start do begin @started_future.complete! io_loop @stopped_future.complete! rescue => e @stopped_future.fail!(e) raise end end end end @started_future end |
#stop ⇒ Future<nil>
Stops the reactor.
Calling this method when the reactor is stopping or has stopped has no effect.
66 67 68 69 70 |
# File 'lib/cql/io/io_reactor.rb', line 66 def stop @running = false command_queue_push(nil) @stopped_future end |