Class: Cql::Protocol::CqlProtocolHandler
- Inherits:
-
Object
- Object
- Cql::Protocol::CqlProtocolHandler
- Defined in:
- lib/cql/protocol/cql_protocol_handler.rb
Overview
This class wraps a single connection and translates between request/ response frames and raw bytes.
You send requests with #send_request, and receive responses through the returned future.
Instances of this class are thread safe.
Defined Under Namespace
Classes: RequestPromise
Instance Attribute Summary collapse
-
#keyspace ⇒ String
readonly
The current keyspace for the underlying connection.
Instance Method Summary collapse
-
#[](key) ⇒ Object
The value associated with the key.
-
#[]=(key, value) ⇒ Object
Associate arbitrary data with this protocol handler object.
-
#close ⇒ Cql::Future
Closes the underlying connection.
-
#closed? ⇒ true, false
True if the underlying connection is closed.
-
#connected? ⇒ true, false
True if the underlying connection is connected.
-
#host ⇒ String
Returns the hostname of the underlying connection.
-
#initialize(connection, scheduler, compressor = nil) ⇒ CqlProtocolHandler
constructor
A new instance of CqlProtocolHandler.
-
#on_closed {|error| ... } ⇒ Object
Register to receive notification when the underlying connection has closed.
-
#on_event {|event| ... } ⇒ Object
Register to receive server sent events, like schema changes, nodes going up or down, etc.
-
#port ⇒ Integer
Returns the port of the underlying connection.
-
#send_request(request, timeout = nil) ⇒ Cql::Future<Cql::Protocol::Response>
Serializes and send a request over the underlying connection.
Constructor Details
#initialize(connection, scheduler, compressor = nil) ⇒ CqlProtocolHandler
Returns a new instance of CqlProtocolHandler.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 22 def initialize(connection, scheduler, compressor=nil) @connection = connection @scheduler = scheduler @compressor = compressor @connection.on_data(&method(:receive_data)) @connection.on_closed(&method(:socket_closed)) @promises = Array.new(128) { nil } @read_buffer = ByteBuffer.new @frame_encoder = FrameEncoder.new(@compressor) @frame_decoder = FrameDecoder.new(@compressor) @current_frame = FrameDecoder::NULL_FRAME @request_queue_in = [] @request_queue_out = [] @event_listeners = [] @data = {} @lock = Mutex.new @closed_promise = Promise.new @keyspace = nil end |
Instance Attribute Details
#keyspace ⇒ String (readonly)
Returns the current keyspace for the underlying connection.
20 21 22 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 20 def keyspace @keyspace end |
Instance Method Details
#[](key) ⇒ Object
Returns the value associated with the key.
66 67 68 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 66 def [](key) @lock.synchronize { @data[key] } end |
#[]=(key, value) ⇒ Object
Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.
60 61 62 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 60 def []=(key, value) @lock.synchronize { @data[key] = value } end |
#close ⇒ Cql::Future
Closes the underlying connection.
147 148 149 150 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 147 def close @connection.close @closed_promise.future end |
#closed? ⇒ true, false
Returns true if the underlying connection is closed.
76 77 78 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 76 def closed? @connection.closed? end |
#connected? ⇒ true, false
Returns true if the underlying connection is connected.
71 72 73 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 71 def connected? @connection.connected? end |
#host ⇒ String
Returns the hostname of the underlying connection
45 46 47 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 45 def host @connection.host end |
#on_closed {|error| ... } ⇒ Object
Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.
86 87 88 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 86 def on_closed(&listener) @closed_promise.future.on_complete(&listener) end |
#on_event {|event| ... } ⇒ Object
Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.
95 96 97 98 99 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 95 def on_event(&listener) @lock.synchronize do @event_listeners += [listener] end end |
#port ⇒ Integer
Returns the port of the underlying connection
52 53 54 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 52 def port @connection.port end |
#send_request(request, timeout = nil) ⇒ Cql::Future<Cql::Protocol::Response>
Serializes and send a request over the underlying connection.
Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.
When ‘timeout` is specified the future will fail with TimeoutError after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 117 def send_request(request, timeout=nil) return Future.failed(NotConnectedError.new) if closed? promise = RequestPromise.new(request, @frame_encoder) id = nil @lock.synchronize do if (id = next_stream_id) @promises[id] = promise end end if id @connection.write do |buffer| @frame_encoder.encode_frame(request, id, buffer) end else @lock.synchronize do promise.encode_frame! @request_queue_in << promise end end if timeout @scheduler.schedule_timer(timeout).on_value do promise.time_out! end end promise.future end |