Class: ActiveCypher::Bolt::Connection
- Inherits:
-
Object
- Object
- ActiveCypher::Bolt::Connection
- Includes:
- VersionEncoding
- Defined in:
- lib/active_cypher/bolt/connection.rb
Constant Summary collapse
- SUPPORTED_VERSIONS =
[5.8, 5.2].freeze
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
-
#connection_id ⇒ Object
readonly
Returns the value of attribute connection_id.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#protocol_handler ⇒ Object
readonly
Access to the protocol handler.
-
#protocol_version ⇒ Object
readonly
Returns the value of attribute protocol_version.
-
#server_agent ⇒ Object
readonly
Returns the value of attribute server_agent.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#timeout_seconds ⇒ Object
readonly
Returns the value of attribute timeout_seconds.
Instance Method Summary collapse
-
#async_read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Asynchronously execute a read transaction.
-
#async_write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Asynchronously execute a write transaction.
-
#close ⇒ Object
Closes the TCP connection if it’s open.
-
#concurrency ⇒ Integer
A single Bolt socket is strictly single‑plex:.
-
#connect ⇒ Object
Establishes the connection to the database.
-
#connected? ⇒ Boolean
Checks if the connection is open and the socket is alive.
-
#database_info ⇒ Hash
Returns comprehensive database information.
-
#database_type ⇒ Symbol
Returns the database type detected from server agent.
-
#dump(label, bytes) ⇒ Object
Debug output for those who enjoy hexadecimal existentialism.
-
#handle_hello_failure(metadata) ⇒ Object
Handles a FAILURE response to HELLO.
-
#handle_hello_success(metadata) ⇒ Object
Handles a SUCCESS response to HELLO.
-
#health_check ⇒ Hash
Performs a health check using database-appropriate queries.
-
#initialize(host, port, adapter, auth_token:, timeout_seconds: 15, secure: false, verify_cert: true) ⇒ Connection
constructor
Initializes a new Bolt connection.
-
#inspect ⇒ Object
Override inspect to redact sensitive information.
-
#perform_handshake ⇒ Object
Performs the Bolt handshake sequence.
-
#read(length) ⇒ Object
Reads data from the socket.
-
#read_message ⇒ Object
Reads a Bolt message using the MessageReader.
-
#read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Synchronously execute a read transaction.
-
#reconnect ⇒ Boolean
Attempts to reconnect if the connection is lost.
-
#reset! ⇒ Boolean
Resets the connection state.
-
#reusable? ⇒ Boolean
Re‑use only if still alive:.
-
#send_hello ⇒ Object
Sends the HELLO message.
-
#send_logon ⇒ Object
Sends the LOGON message.
-
#server_status ⇒ Array<Hash>?
Returns server/cluster status information (when available).
-
#session ⇒ Bolt::Session
Returns a fresh Session object that re‑uses this TCP/Bolt socket.
-
#version ⇒ Hash
Returns parsed version information from the server agent string.
-
#viable? ⇒ Boolean
This method is required by Async::Pool to check if the connection is viable for reuse.
-
#write(data) ⇒ Object
Writes data to the socket.
-
#write_message(message, debug_label = nil) ⇒ Object
Writes a Bolt message using the MessageWriter, adding Bolt chunking.
-
#write_raw(bytes) ⇒ Object
Writes raw bytes directly to the socket.
-
#write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Synchronously execute a write transaction.
Constructor Details
#initialize(host, port, adapter, auth_token:, timeout_seconds: 15, secure: false, verify_cert: true) ⇒ Connection
The ceremony required to instantiate a connection. Because nothing says “enterprise” like 8 arguments.
Initializes a new Bolt connection.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/active_cypher/bolt/connection.rb', line 53 def initialize(host, port, adapter, auth_token:, timeout_seconds: 15, secure: false, verify_cert: true) @host = host @port = port @auth_token = auth_token @timeout_seconds = timeout_seconds @secure = secure @verify_cert = verify_cert @adapter = adapter @socket = nil @connected = false @protocol_version = nil @server_agent = nil @connection_id = nil @reconnect_attempts = 0 @max_reconnect_attempts = 3 end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def adapter @adapter end |
#connection_id ⇒ Object (readonly)
Returns the value of attribute connection_id.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def connection_id @connection_id end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def port @port end |
#protocol_handler ⇒ Object (readonly)
Access to the protocol handler
440 441 442 |
# File 'lib/active_cypher/bolt/connection.rb', line 440 def protocol_handler @protocol_handler end |
#protocol_version ⇒ Object (readonly)
Returns the value of attribute protocol_version.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def protocol_version @protocol_version end |
#server_agent ⇒ Object (readonly)
Returns the value of attribute server_agent.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def server_agent @server_agent end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def socket @socket end |
#timeout_seconds ⇒ Object (readonly)
Returns the value of attribute timeout_seconds.
17 18 19 |
# File 'lib/active_cypher/bolt/connection.rb', line 17 def timeout_seconds @timeout_seconds end |
Instance Method Details
#async_read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Asynchronously execute a read transaction.
484 485 486 |
# File 'lib/active_cypher/bolt/connection.rb', line 484 def async_read_transaction(db: nil, timeout: nil, metadata: nil, &) session(database: db).async_read_transaction(db: db, timeout: timeout, metadata: , &) end |
#async_write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Asynchronously execute a write transaction.
489 490 491 |
# File 'lib/active_cypher/bolt/connection.rb', line 489 def async_write_transaction(db: nil, timeout: nil, metadata: nil, &) session(database: db).async_write_transaction(db: db, timeout: timeout, metadata: , &) end |
#close ⇒ Object
The digital equivalent of ghosting.
Closes the TCP connection if it’s open.
135 136 137 138 139 140 141 |
# File 'lib/active_cypher/bolt/connection.rb', line 135 def close @socket.close if connected? rescue IOError ensure @socket = nil @connected = false end |
#concurrency ⇒ Integer
A single Bolt socket is strictly single‑plex:
219 |
# File 'lib/active_cypher/bolt/connection.rb', line 219 def concurrency = 1 |
#connect ⇒ Object
Attempts to connect, or at least to feel something.
Establishes the connection to the database.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/active_cypher/bolt/connection.rb', line 80 def connect return if connected? # Using a variable to track errors instead of re-raising inside the Async block error = nil begin Async do |task| task.with_timeout(@timeout_seconds) do @socket = open_socket perform_handshake @connected = true @reconnect_attempts = 0 end rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError, OpenSSL::SSL::SSLError => e # Catch connection errors inside the task close # Store the error instead of raising error = ConnectionError.new("Failed to connect to #{host}:#{port} - #{e.}") rescue StandardError => e # Catch any other errors inside the task close # Store the error instead of raising error = ConnectionError.new("Error during connection: #{e.}") end.wait rescue Async::TimeoutError => e error = ConnectionError.new("Connection timed out to #{host}:#{port} - #{e.}") rescue StandardError => e close error = ConnectionError.new("Connection error: #{e.}") end # After the Async block is complete, raise the error if one occurred raise error if error end |
#connected? ⇒ Boolean
Checks if we’re still pretending to be connected.
Checks if the connection is open and the socket is alive.
147 |
# File 'lib/active_cypher/bolt/connection.rb', line 147 def connected? = @connected && socket_open? |
#database_info ⇒ Hash
Returns comprehensive database information.
544 545 546 547 548 549 550 551 552 553 554 555 |
# File 'lib/active_cypher/bolt/connection.rb', line 544 def database_info info = version.dup health = health_check info.merge({ healthy: health[:healthy], response_time_ms: health[:response_time_ms], server_agent: @server_agent, connection_id: @connection_id, protocol_version: @protocol_version }) end |
#database_type ⇒ Symbol
Returns the database type detected from server agent.
510 511 512 |
# File 'lib/active_cypher/bolt/connection.rb', line 510 def database_type version[:database_type] end |
#dump(label, bytes) ⇒ Object
Debug output for those who enjoy hexadecimal existentialism.
212 213 214 |
# File 'lib/active_cypher/bolt/connection.rb', line 212 def dump(label, bytes) puts "[DEBUG] #{label.ljust(18)}: #{bytes.bytes.map { |b| b.to_s(16).rjust(2, '0') }.join(' ')}" if ENV['DEBUG'] end |
#handle_hello_failure(metadata) ⇒ Object
The more common outcome.
Handles a FAILURE response to HELLO.
377 378 379 380 381 382 |
# File 'lib/active_cypher/bolt/connection.rb', line 377 def handle_hello_failure() code = ['code'] = ['message'] close raise ConnectionError, "Authentication failed: #{code} - #{}" end |
#handle_hello_success(metadata) ⇒ Object
The rarest of all outcomes.
Handles a SUCCESS response to HELLO.
369 370 371 372 |
# File 'lib/active_cypher/bolt/connection.rb', line 369 def handle_hello_success() @connection_id = ['connection_id'] @server_agent = ['server'] end |
#health_check ⇒ Hash
Uses different queries based on detected database type
Performs a health check using database-appropriate queries.
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 |
# File 'lib/active_cypher/bolt/connection.rb', line 518 def health_check return { healthy: false, response_time_ms: nil, details: 'Not connected' } unless connected? result = nil begin Async do result = case database_type when :neo4j perform_neo4j_health_check when :memgraph perform_memgraph_health_check else perform_generic_health_check end end.wait result rescue ConnectionError, ProtocolError => e { healthy: false, response_time_ms: nil, details: "Health check failed: #{e.}" } end end |
#inspect ⇒ Object
Override inspect to redact sensitive information
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/active_cypher/bolt/connection.rb', line 21 def inspect filtered_auth = ActiveCypher::Redaction.filter_hash(@auth_token) attributes = { host: @host.inspect, port: @port.inspect, auth_token: filtered_auth.inspect, timeout_seconds: @timeout_seconds.inspect, secure: @secure.inspect, verify_cert: @verify_cert.inspect, connected: @connected.inspect, protocol_version: @protocol_version.inspect, server_agent: @server_agent.inspect, connection_id: @connection_id.inspect } "#<#{self.class.name}:0x#{object_id.to_s(16)} #{attributes.map { |k, v| "@#{k}=#{v}" }.join(', ')}>" end |
#perform_handshake ⇒ Object
The digital equivalent of a secret handshake, but with more bytes and less trust.
Performs the Bolt handshake sequence.
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/active_cypher/bolt/connection.rb', line 250 def perform_handshake # Bolt Magic Preamble (0x6060B017) magic = "\x60\x60\xB0\x17" dump('Magic', magic) write_raw(magic) # Proposed Bolt Versions (ordered by preference) # Encoded as 4‑byte big‑endian integers proposed_versions = (SUPPORTED_VERSIONS + [0, 0])[0, 4] versions = proposed_versions.map { |v| encode_version(v) }.join dump('Sending versions', versions) write_raw(versions) # Read agreed version (4 bytes) agreed_version_bytes = read_raw(4) dump('Agreed version', agreed_version_bytes) @protocol_version = decode_version(agreed_version_bytes) # Validate agreed version unless SUPPORTED_VERSIONS.include?(@protocol_version) close raise ProtocolError, "Server only supports unsupported Bolt protocol (#{@protocol_version}). This client requires one of: #{SUPPORTED_VERSIONS.join(', ')}" end # Send HELLO message send_hello # Read response (should be SUCCESS or FAILURE) response = begin msg = msg rescue EOFError => e raise ConnectionError, "Server closed connection: #{e.}" end case response when Messaging::Success handle_hello_success(response.) # if auth credentials were provided, send LOGON send_logon if @auth_token && @auth_token[:scheme] == 'basic' # Let adapter create protocol handler instead of directly instantiating @protocol_handler = @adapter.create_protocol_handler(self) when Messaging::Failure handle_hello_failure(response.) else close raise ProtocolError, "Unexpected response during handshake: #{response.class}" end rescue ConnectionError, ProtocolError => e close raise e rescue StandardError => e close raise ConnectionError, "Handshake error: #{e.}" end |
#read(length) ⇒ Object
Reading from the void, hoping something meaningful comes back.
Reads data from the socket.
199 200 201 202 203 204 205 206 |
# File 'lib/active_cypher/bolt/connection.rb', line 199 def read(length) raise ConnectionError, 'Not connected' unless connected? @socket.read_exactly(length) rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError => e close raise ConnectionError, "Connection lost during read: #{e.}" end |
#read_message ⇒ Object
Reads from the abyss and hopes for a message, not a void.
Reads a Bolt message using the MessageReader.
426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/active_cypher/bolt/connection.rb', line 426 def raise ConnectionError, 'Socket not open for reading' unless socket_open? reader = MessageReader.new(@socket) reader. rescue ConnectionError, ProtocolError => e close raise e rescue EOFError => e close raise ConnectionError, "Connection closed unexpectedly: #{e.}" end |
#read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Synchronously execute a read transaction.
474 475 476 |
# File 'lib/active_cypher/bolt/connection.rb', line 474 def read_transaction(db: nil, timeout: nil, metadata: nil, &) session(database: db).read_transaction(db: db, timeout: timeout, metadata: , &) end |
#reconnect ⇒ Boolean
Attempts to reconnect, because hope springs eternal.
Attempts to reconnect if the connection is lost.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/active_cypher/bolt/connection.rb', line 153 def reconnect return true if connected? @reconnect_attempts += 1 if @reconnect_attempts <= @max_reconnect_attempts close begin connect # Reset reconnect counter on successful connection @reconnect_attempts = 0 true rescue ConnectionError => e # Log the error but don't raise puts "Reconnection attempt #{@reconnect_attempts}/#{@max_reconnect_attempts} failed: #{e.}" if ENV['DEBUG'] # Sleep to avoid hammering the server sleep(0.1 * @reconnect_attempts) false end else # Reset the counter after max attempts to allow future reconnects @reconnect_attempts = 0 false end end |
#reset! ⇒ Boolean
For when you want to pretend nothing ever happened.
Resets the connection state.
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 |
# File 'lib/active_cypher/bolt/connection.rb', line 446 def reset! return false unless connected? begin (ActiveCypher::Bolt::Messaging::Reset.new) msg = # should be Messaging::Success msg.is_a?(ActiveCypher::Bolt::Messaging::Success) rescue ConnectionError, ProtocolError # If reset fails, close the connection close false rescue StandardError # For any other errors, also close the connection close false end end |
#reusable? ⇒ Boolean
Re‑use only if still alive:
224 |
# File 'lib/active_cypher/bolt/connection.rb', line 224 def reusable? = connected? |
#send_hello ⇒ Object
Because every protocol needs a little small talk before the pain begins.
Sends the HELLO message.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/active_cypher/bolt/connection.rb', line 312 def send_hello user_agent = "ActiveCypher::Bolt/#{ActiveCypher::VERSION} (Ruby/#{RUBY_VERSION})" platform = RUBY_DESCRIPTION.split[1..].join(' ') # Gets everything after "ruby" in RUBY_DESCRIPTION = { 'user_agent' => user_agent, 'notifications_minimum_severity' => 'WARNING', 'bolt_agent' => { 'product' => user_agent, 'platform' => platform, 'language' => "#{RUBY_PLATFORM}/#{RUBY_VERSION}", 'language_details' => "#{RUBY_ENGINE} #{RUBY_ENGINE_VERSION}" } } = Messaging::Hello.new() (, 'HELLO') end |
#send_logon ⇒ Object
Because authentication is just another opportunity for disappointment.
Sends the LOGON message.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/active_cypher/bolt/connection.rb', line 332 def send_logon # Get credentials from the connection's auth token = { 'scheme' => @auth_token[:scheme], 'principal' => @auth_token[:principal], 'credentials' => @auth_token[:credentials] } # Create and send LOGON message begin logon_msg = Messaging::Logon.new() (logon_msg, 'LOGON') # Read and process response logon_response = case logon_response when Messaging::Success true when Messaging::Failure code = logon_response.['code'] = logon_response.['message'] close raise ConnectionError, "Authentication failed during LOGON: #{code} - #{}" else close raise ProtocolError, "Unexpected response to LOGON: #{logon_response.class}" end rescue StandardError => e close raise ConnectionError, "Authentication error: #{e.}" end end |
#server_status ⇒ Array<Hash>?
Returns server/cluster status information (when available).
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'lib/active_cypher/bolt/connection.rb', line 560 def server_status return nil unless connected? begin case database_type when :neo4j get_neo4j_server_status when :memgraph get_memgraph_server_status else nil end rescue ConnectionError, ProtocolError nil # Gracefully handle unsupported operations end end |
#session ⇒ Bolt::Session
Because every connection deserves a second chance.
Returns a fresh Session object that re‑uses this TCP/Bolt socket.
469 470 471 |
# File 'lib/active_cypher/bolt/connection.rb', line 469 def session(**) Bolt::Session.new(self, **) end |
#version ⇒ Hash
Extracts version from server_agent captured during handshake
Returns parsed version information from the server agent string.
501 502 503 504 505 |
# File 'lib/active_cypher/bolt/connection.rb', line 501 def version return @version if defined?(@version) @version = parse_version_from_server_agent end |
#viable? ⇒ Boolean
The database equivalent of “are you still there?”
This method is required by Async::Pool to check if the connection is viable for reuse
230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/active_cypher/bolt/connection.rb', line 230 def viable? return false unless connected? # Use the health check method to determine viability health = health_check if health[:healthy] true else # If health check failed, close the connection close false end end |
#write(data) ⇒ Object
Because nothing says “robust” like a method that can explode at any time.
Writes data to the socket.
184 185 186 187 188 189 190 191 |
# File 'lib/active_cypher/bolt/connection.rb', line 184 def write(data) raise ConnectionError, 'Not connected' unless connected? @socket.write(data) rescue Errno::EPIPE, IOError => e close raise ConnectionError, "Connection lost during write: #{e.}" end |
#write_message(message, debug_label = nil) ⇒ Object
Because nothing says “enterprise” like chunked binary messages.
Writes a Bolt message using the MessageWriter, adding Bolt chunking.
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/active_cypher/bolt/connection.rb', line 391 def (, debug_label = nil) raise ConnectionError, 'Socket not open for writing' unless socket_open? if .is_a?(ActiveCypher::Bolt::Messaging::Run) dump '→ RUN', " #{.fields[0]} #{.fields[2].inspect}" # query & metadata end # 1. Pack the message into a temporary buffer = StringIO.new(+'', 'wb') writer = MessageWriter.new() writer.write() = .string = .bytesize # Debug output if a label was provided dump(debug_label, ) if debug_label # 2. Write the chunk header and data chunk_header = [].pack('n') write_raw(chunk_header) write_raw() write_raw("\x00\x00") # Chunk terminator # Ensure everything is sent @socket.flush rescue StandardError => e close raise ProtocolError, "Failed to write message: #{e.}" end |
#write_raw(bytes) ⇒ Object
Because sometimes you just want to feel close to the metal.
Writes raw bytes directly to the socket.
123 124 125 126 127 128 129 130 |
# File 'lib/active_cypher/bolt/connection.rb', line 123 def write_raw(bytes) raise ConnectionError, 'Socket not open for writing' unless socket_open? @socket.write(bytes) # Async::IO::Socket yields if blocked. rescue IOError, Errno::EPIPE => e close raise ConnectionError, "Connection lost during raw write: #{e.}" end |
#write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object
Synchronously execute a write transaction.
479 480 481 |
# File 'lib/active_cypher/bolt/connection.rb', line 479 def write_transaction(db: nil, timeout: nil, metadata: nil, &) session(database: db).write_transaction(db: db, timeout: timeout, metadata: , &) end |