Class: Cql::Protocol::CqlProtocolHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/cql/protocol/cql_protocol_handler.rb

Overview

This class wraps a single connection and translates between request/ response frames and raw bytes.

You send requests with #send_request, and receive responses through the returned future.

Instances of this class are thread safe.

Examples:

Sending an OPTIONS request

future = protocol_handler.send_request(Cql::Protocol::OptionsRequest.new)
response = future.get
puts "These options are supported: #{response.options}"

Defined Under Namespace

Classes: RequestPromise

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, scheduler, compressor = nil) ⇒ CqlProtocolHandler

Returns a new instance of CqlProtocolHandler.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 22

def initialize(connection, scheduler, compressor=nil)
  @connection = connection
  @scheduler = scheduler
  @compressor = compressor
  @connection.on_data(&method(:receive_data))
  @connection.on_closed(&method(:socket_closed))
  @promises = Array.new(128) { nil }
  @read_buffer = ByteBuffer.new
  @frame_encoder = FrameEncoder.new(@compressor)
  @frame_decoder = FrameDecoder.new(@compressor)
  @current_frame = FrameDecoder::NULL_FRAME
  @request_queue_in = []
  @request_queue_out = []
  @event_listeners = []
  @data = {}
  @lock = Mutex.new
  @closed_promise = Promise.new
  @keyspace = nil
end

Instance Attribute Details

#keyspaceString (readonly)

Returns the current keyspace for the underlying connection.

Returns:

  • (String)

    the current keyspace for the underlying connection



20
21
22
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 20

def keyspace
  @keyspace
end

Instance Method Details

#[](key) ⇒ Object

Returns the value associated with the key.

Returns:

  • the value associated with the key

See Also:

  • Cql::Protocol::CqlProtocolHandler.{{#[]=}


66
67
68
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 66

def [](key)
  @lock.synchronize { @data[key] }
end

#[]=(key, value) ⇒ Object

Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.



60
61
62
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 60

def []=(key, value)
  @lock.synchronize { @data[key] = value }
end

#closeCql::Future

Closes the underlying connection.

Returns:

  • (Cql::Future)

    a future that completes when the connection has closed



147
148
149
150
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 147

def close
  @connection.close
  @closed_promise.future
end

#closed?true, false

Returns true if the underlying connection is closed.

Returns:

  • (true, false)

    true if the underlying connection is closed



76
77
78
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 76

def closed?
  @connection.closed?
end

#connected?true, false

Returns true if the underlying connection is connected.

Returns:

  • (true, false)

    true if the underlying connection is connected



71
72
73
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 71

def connected?
  @connection.connected?
end

#hostString

Returns the hostname of the underlying connection

Returns:

  • (String)

    the hostname



45
46
47
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 45

def host
  @connection.host
end

#on_closed {|error| ... } ⇒ Object

Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.

Yield Parameters:

  • error (nil, Error)

    the error that caused the connection to close, if any



86
87
88
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 86

def on_closed(&listener)
  @closed_promise.future.on_complete(&listener)
end

#on_event {|event| ... } ⇒ Object

Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.

Yield Parameters:



95
96
97
98
99
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 95

def on_event(&listener)
  @lock.synchronize do
    @event_listeners += [listener]
  end
end

#portInteger

Returns the port of the underlying connection

Returns:

  • (Integer)

    the port



52
53
54
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 52

def port
  @connection.port
end

#send_request(request, timeout = nil) ⇒ Cql::Future<Cql::Protocol::Response>

Serializes and send a request over the underlying connection.

Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.

When ‘timeout` is specified the future will fail with TimeoutError after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.

Parameters:

  • request (Cql::Protocol::Request)
  • timeout (Float) (defaults to: nil)

    an optional number of seconds to wait until failing the request

Returns:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/cql/protocol/cql_protocol_handler.rb', line 117

def send_request(request, timeout=nil)
  return Future.failed(NotConnectedError.new) if closed?
  promise = RequestPromise.new(request, @frame_encoder)
  id = nil
  @lock.synchronize do
    if (id = next_stream_id)
      @promises[id] = promise
    end
  end
  if id
    @connection.write do |buffer|
      @frame_encoder.encode_frame(request, id, buffer)
    end
  else
    @lock.synchronize do
      promise.encode_frame!
      @request_queue_in << promise
    end
  end
  if timeout
    @scheduler.schedule_timer(timeout).on_value do
      promise.time_out!
    end
  end
  promise.future
end