Class: ActiveCypher::Bolt::Connection

Inherits:
Object
  • Object
show all
Includes:
VersionEncoding
Defined in:
lib/active_cypher/bolt/connection.rb

Constant Summary collapse

SUPPORTED_VERSIONS =
[5.8, 5.2].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, adapter, auth_token:, timeout_seconds: 15, secure: false, verify_cert: true) ⇒ Connection

Note:

The ceremony required to instantiate a connection. Because nothing says “enterprise” like 8 arguments.

Initializes a new Bolt connection.

Parameters:

  • host (String)

    the database host

  • port (Integer)

    the database port

  • adapter (Object)

    the adapter using this connection

  • auth_token (Hash)

    authentication token

  • timeout_seconds (Integer) (defaults to: 15)

    connection timeout in seconds

  • secure (Boolean) (defaults to: false)

    whether to use SSL

  • verify_cert (Boolean) (defaults to: true)

    whether to verify SSL certificates



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/active_cypher/bolt/connection.rb', line 54

def initialize(host, port, adapter,
               auth_token:, timeout_seconds: 15,
               secure: false, verify_cert: true)
  @host               = host
  @port               = port
  @auth_token         = auth_token
  @timeout_seconds    = timeout_seconds
  @secure             = secure
  @verify_cert        = verify_cert
  @adapter            = adapter

  @socket             = nil
  @connected          = false
  @protocol_version   = nil
  @server_agent       = nil
  @connection_id      = nil
  @reconnect_attempts = 0
  @max_reconnect_attempts = 3
  @count = 0
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def adapter
  @adapter
end

#connection_idObject (readonly)

Returns the value of attribute connection_id.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def connection_id
  @connection_id
end

#countObject (readonly)

Returns the value of attribute count.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def count
  @count
end

#hostObject (readonly)

Returns the value of attribute host.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def port
  @port
end

#protocol_handlerObject (readonly)

Access to the protocol handler



445
446
447
# File 'lib/active_cypher/bolt/connection.rb', line 445

def protocol_handler
  @protocol_handler
end

#protocol_versionObject (readonly)

Returns the value of attribute protocol_version.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def protocol_version
  @protocol_version
end

#server_agentObject (readonly)

Returns the value of attribute server_agent.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def server_agent
  @server_agent
end

#socketObject (readonly)

Returns the value of attribute socket.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def socket
  @socket
end

#timeout_secondsObject (readonly)

Returns the value of attribute timeout_seconds.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def timeout_seconds
  @timeout_seconds
end

Instance Method Details

#closeObject

Note:

The digital equivalent of ghosting.

Closes the TCP connection if it’s open.



137
138
139
140
141
142
143
# File 'lib/active_cypher/bolt/connection.rb', line 137

def close
  @socket.close if connected?
rescue IOError
ensure
  @socket = nil
  @connected = false
end

#concurrencyInteger

A single Bolt socket is strictly single‑plex:

Returns:

  • (Integer)

    always 1, because concurrency is for people with more optimistic protocols.



221
# File 'lib/active_cypher/bolt/connection.rb', line 221

def concurrency = 1

#connectObject

Note:

Attempts to connect, or at least to feel something.

Establishes the connection to the database.

Raises:



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/active_cypher/bolt/connection.rb', line 82

def connect
  return if connected?

  # Using a variable to track errors instead of re-raising inside the Async block
  error = nil

  begin
    Sync do |task|
      task.with_timeout(@timeout_seconds) do
        @socket = open_socket
        perform_handshake
        @connected          = true
        @reconnect_attempts = 0
      end
    rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH,
           SocketError, OpenSSL::SSL::SSLError => e
      # Catch connection errors inside the task
      close
      # Store the error instead of raising
      error = ConnectionError.new("Failed to connect to #{host}:#{port} - #{e.message}")
    rescue StandardError => e
      # Catch any other errors inside the task
      close
      # Store the error instead of raising
      error = ConnectionError.new("Error during connection: #{e.message}")
    end
  rescue Async::TimeoutError => e
    error = ConnectionError.new("Connection timed out to #{host}:#{port} - #{e.message}")
  rescue StandardError => e
    close
    error = ConnectionError.new("Connection error: #{e.message}")
  end

  # After the Async block is complete, raise the error if one occurred
  raise error if error
end

#connected?Boolean

Note:

Checks if we’re still pretending to be connected.

Checks if the connection is open and the socket is alive.

Returns:

  • (Boolean)


149
# File 'lib/active_cypher/bolt/connection.rb', line 149

def connected? = @connected && socket_open?

#database_infoHash

Returns comprehensive database information.

Returns:

  • (Hash)

    database information including version, health, and system details



540
541
542
543
544
545
546
547
548
549
550
551
# File 'lib/active_cypher/bolt/connection.rb', line 540

def database_info
  info = version.dup
  health = health_check

  info.merge({
               healthy: health[:healthy],
               response_time_ms: health[:response_time_ms],
               server_agent: @server_agent,
               connection_id: @connection_id,
               protocol_version: @protocol_version
             })
end

#database_typeSymbol

Returns the database type detected from server agent.

Returns:

  • (Symbol)

    :neo4j, :memgraph, or :unknown



506
507
508
# File 'lib/active_cypher/bolt/connection.rb', line 506

def database_type
  version[:database_type]
end

#dump(label, bytes) ⇒ Object

Debug output for those who enjoy hexadecimal existentialism.

Parameters:

  • label (String)
  • bytes (String)


214
215
216
# File 'lib/active_cypher/bolt/connection.rb', line 214

def dump(label, bytes)
  puts "[DEBUG] #{label.ljust(18)}: #{bytes.bytes.map { |b| b.to_s(16).rjust(2, '0') }.join(' ')}" if ENV['DEBUG']
end

#handle_hello_failure(metadata) ⇒ Object

Note:

The more common outcome.

Handles a FAILURE response to HELLO.

Raises:



382
383
384
385
386
387
# File 'lib/active_cypher/bolt/connection.rb', line 382

def handle_hello_failure()
  code    = ['code']
  message = ['message']
  close
  raise ConnectionError, "Authentication failed: #{code} - #{message}"
end

#handle_hello_success(metadata) ⇒ Object

Note:

The rarest of all outcomes.

Handles a SUCCESS response to HELLO.



374
375
376
377
# File 'lib/active_cypher/bolt/connection.rb', line 374

def handle_hello_success()
  @connection_id = ['connection_id']
  @server_agent  = ['server']
end

#health_checkHash

Note:

Uses different queries based on detected database type

Performs a health check using database-appropriate queries.

Returns:

  • (Hash)

    health check result with :healthy, :response_time_ms, :details



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
# File 'lib/active_cypher/bolt/connection.rb', line 514

def health_check
  return { healthy: false, response_time_ms: nil, details: 'Not connected' } unless connected?

  result = nil

  begin
    result = Sync do
      case database_type
      when :neo4j
        perform_neo4j_health_check
      when :memgraph
        perform_memgraph_health_check
      else
        perform_generic_health_check
      end
    end

    result
  rescue ConnectionError, ProtocolError => e
    { healthy: false, response_time_ms: nil, details: "Health check failed: #{e.message}" }
  end
end

#inspectObject

Override inspect to redact sensitive information



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/active_cypher/bolt/connection.rb', line 22

def inspect
  filtered_auth = ActiveCypher::Redaction.filter_hash(@auth_token)

  attributes = {
    host: @host.inspect,
    port: @port.inspect,
    auth_token: filtered_auth.inspect,
    timeout_seconds: @timeout_seconds.inspect,
    secure: @secure.inspect,
    verify_cert: @verify_cert.inspect,
    connected: @connected.inspect,
    protocol_version: @protocol_version.inspect,
    server_agent: @server_agent.inspect,
    connection_id: @connection_id.inspect
  }

  "#<#{self.class.name}:0x#{object_id.to_s(16)} #{attributes.map { |k, v| "@#{k}=#{v}" }.join(', ')}>"
end

#mark_used!Object

Increment the usage counter for compatibility with Async::Pool diagnostics.



229
# File 'lib/active_cypher/bolt/connection.rb', line 229

def mark_used! = @count += 1

#perform_handshakeObject

Note:

The digital equivalent of a secret handshake, but with more bytes and less trust.

Performs the Bolt handshake sequence.

Raises:



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/active_cypher/bolt/connection.rb', line 255

def perform_handshake
  # Bolt Magic Preamble (0x6060B017)
  magic = "\x60\x60\xB0\x17"
  dump('Magic', magic)
  write_raw(magic)

  # Proposed Bolt Versions (ordered by preference)
  # Encoded as 4‑byte big‑endian integers
  proposed_versions = (SUPPORTED_VERSIONS + [0, 0])[0, 4]
  versions = proposed_versions.map { |v| encode_version(v) }.join
  dump('Sending versions', versions)
  write_raw(versions)

  # Read agreed version (4 bytes)
  agreed_version_bytes = read_raw(4)
  dump('Agreed version', agreed_version_bytes)
  @protocol_version = decode_version(agreed_version_bytes)

  # Validate agreed version
  unless SUPPORTED_VERSIONS.include?(@protocol_version)
    close
    raise ProtocolError,
          "Server only supports unsupported Bolt protocol (#{@protocol_version}). This client requires one of: #{SUPPORTED_VERSIONS.join(', ')}"
  end

  # Send HELLO message
  send_hello

  # Read response (should be SUCCESS or FAILURE)
  response = begin
    msg = read_message
    msg
  rescue EOFError => e
    raise ConnectionError, "Server closed connection: #{e.message}"
  end

  case response
  when Messaging::Success
    handle_hello_success(response.)

    # if auth credentials were provided, send LOGON
    send_logon if @auth_token && @auth_token[:scheme] == 'basic'

    # Let adapter create protocol handler instead of directly instantiating
    @protocol_handler = @adapter.create_protocol_handler(self)
  when Messaging::Failure
    handle_hello_failure(response.)
  else
    close
    raise ProtocolError, "Unexpected response during handshake: #{response.class}"
  end
rescue ConnectionError, ProtocolError => e
  close
  raise e
rescue StandardError => e
  close
  raise ConnectionError, "Handshake error: #{e.message}"
end

#read(length) ⇒ Object

Note:

Reading from the void, hoping something meaningful comes back.

Reads data from the socket.

Parameters:

  • length (Integer)

    number of bytes to read

Raises:



201
202
203
204
205
206
207
208
# File 'lib/active_cypher/bolt/connection.rb', line 201

def read(length)
  raise ConnectionError, 'Not connected' unless connected?

  @socket.read_exactly(length)
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError => e
  close
  raise ConnectionError, "Connection lost during read: #{e.message}"
end

#read_messageObject

Note:

Reads from the abyss and hopes for a message, not a void.

Reads a Bolt message using the MessageReader.

Returns:

  • (Object)

    the Bolt message

Raises:



431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/active_cypher/bolt/connection.rb', line 431

def read_message
  raise ConnectionError, 'Socket not open for reading' unless socket_open?

  reader = MessageReader.new(@socket)
  reader.read_message
rescue ConnectionError, ProtocolError => e
  close
  raise e
rescue EOFError => e
  close
  raise ConnectionError, "Connection closed unexpectedly: #{e.message}"
end

#read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Synchronously execute a read transaction.



479
480
481
# File 'lib/active_cypher/bolt/connection.rb', line 479

def read_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).read_transaction(db: db, timeout: timeout, metadata: , &)
end

#reconnectBoolean

Note:

Attempts to reconnect, because hope springs eternal.

Attempts to reconnect if the connection is lost.

Returns:

  • (Boolean)

    True if reconnection was successful, false otherwise



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/active_cypher/bolt/connection.rb', line 155

def reconnect
  return true if connected?

  @reconnect_attempts += 1
  if @reconnect_attempts <= @max_reconnect_attempts
    close
    begin
      connect
      # Reset reconnect counter on successful connection
      @reconnect_attempts = 0
      true
    rescue ConnectionError => e
      # Log the error but don't raise
      puts "Reconnection attempt #{@reconnect_attempts}/#{@max_reconnect_attempts} failed: #{e.message}" if ENV['DEBUG']
      # Sleep to avoid hammering the server
      sleep(0.1 * @reconnect_attempts)
      false
    end
  else
    # Reset the counter after max attempts to allow future reconnects
    @reconnect_attempts = 0
    false
  end
end

#reset!Boolean

Note:

For when you want to pretend nothing ever happened.

Resets the connection state.

Returns:

  • (Boolean)

    true if reset succeeded, false otherwise



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/active_cypher/bolt/connection.rb', line 451

def reset!
  return false unless connected?

  begin
    write_message(ActiveCypher::Bolt::Messaging::Reset.new)
    msg = read_message # should be Messaging::Success
    msg.is_a?(ActiveCypher::Bolt::Messaging::Success)
  rescue ConnectionError, ProtocolError
    # If reset fails, close the connection
    close
    false
  rescue StandardError
    # For any other errors, also close the connection
    close
    false
  end
end

#reusable?Boolean

Re‑use only if still alive:

Returns:

  • (Boolean)


226
# File 'lib/active_cypher/bolt/connection.rb', line 226

def reusable?   = connected?

#send_helloObject

Note:

Because every protocol needs a little small talk before the pain begins.

Sends the HELLO message.



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/active_cypher/bolt/connection.rb', line 317

def send_hello
  user_agent = "ActiveCypher::Bolt/#{ActiveCypher::VERSION} (Ruby/#{RUBY_VERSION})"
  platform = RUBY_DESCRIPTION.split[1..].join(' ') # Gets everything after "ruby" in RUBY_DESCRIPTION
   = {
    'user_agent' => user_agent,
    'notifications_minimum_severity' => 'WARNING',
    'bolt_agent' => {
      'product' => user_agent,
      'platform' => platform,
      'language' => "#{RUBY_PLATFORM}/#{RUBY_VERSION}",
      'language_details' => "#{RUBY_ENGINE} #{RUBY_ENGINE_VERSION}"
    }
  }
  hello_message = Messaging::Hello.new()
  write_message(hello_message, 'HELLO')
end

#send_logonObject

Note:

Because authentication is just another opportunity for disappointment.

Sends the LOGON message.



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/active_cypher/bolt/connection.rb', line 337

def send_logon
  # Get credentials from the connection's auth token
   = {
    'scheme' => @auth_token[:scheme],
    'principal' => @auth_token[:principal],
    'credentials' => @auth_token[:credentials]
  }

  # Create and send LOGON message
  begin
    logon_msg = Messaging::Logon.new()
    write_message(logon_msg, 'LOGON')

    # Read and process response
    logon_response = read_message

    case logon_response
    when Messaging::Success
      true
    when Messaging::Failure
      code = logon_response.['code']
      message = logon_response.['message']
      close
      raise ConnectionError, "Authentication failed during LOGON: #{code} - #{message}"
    else
      close
      raise ProtocolError, "Unexpected response to LOGON: #{logon_response.class}"
    end
  rescue StandardError => e
    close
    raise ConnectionError, "Authentication error: #{e.message}"
  end
end

#server_statusArray<Hash>?

Returns server/cluster status information (when available).

Returns:

  • (Array<Hash>, nil)

    array of server status objects or nil if not supported



556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
# File 'lib/active_cypher/bolt/connection.rb', line 556

def server_status
  return nil unless connected?

  begin
    case database_type
    when :neo4j
      get_neo4j_server_status
    when :memgraph
      get_memgraph_server_status
    else
      nil
    end
  rescue ConnectionError, ProtocolError
    nil # Gracefully handle unsupported operations
  end
end

#sessionBolt::Session

Note:

Because every connection deserves a second chance.

Returns a fresh Session object that re‑uses this TCP/Bolt socket.

Parameters:

  • **kwargs

    passed to the Session initializer

Returns:



474
475
476
# File 'lib/active_cypher/bolt/connection.rb', line 474

def session(**)
  Bolt::Session.new(self, **)
end

#versionHash

Note:

Extracts version from server_agent captured during handshake

Returns parsed version information from the server agent string.

Returns:

  • (Hash)

    version information with :database_type, :version, :major, :minor, :patch



497
498
499
500
501
# File 'lib/active_cypher/bolt/connection.rb', line 497

def version
  return @version if defined?(@version)

  @version = parse_version_from_server_agent
end

#viable?Boolean

Note:

The database equivalent of “are you still there?”

This method is required by Async::Pool to check if the connection is viable for reuse

Returns:

  • (Boolean)


235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/active_cypher/bolt/connection.rb', line 235

def viable?
  return false unless connected?

  # Use the health check method to determine viability
  health = health_check

  if health[:healthy]
    true
  else
    # If health check failed, close the connection
    close
    false
  end
end

#write(data) ⇒ Object

Note:

Because nothing says “robust” like a method that can explode at any time.

Writes data to the socket.

Parameters:

  • data (String)

    the data to write

Raises:



186
187
188
189
190
191
192
193
# File 'lib/active_cypher/bolt/connection.rb', line 186

def write(data)
  raise ConnectionError, 'Not connected' unless connected?

  @socket.write(data)
rescue Errno::EPIPE, IOError => e
  close
  raise ConnectionError, "Connection lost during write: #{e.message}"
end

#write_message(message, debug_label = nil) ⇒ Object

Note:

Because nothing says “enterprise” like chunked binary messages.

Writes a Bolt message using the MessageWriter, adding Bolt chunking.

Parameters:

  • message (Object)

    the Bolt message to write

  • debug_label (String, nil) (defaults to: nil)

    optional debug label

Raises:



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/active_cypher/bolt/connection.rb', line 396

def write_message(message, debug_label = nil)
  raise ConnectionError, 'Socket not open for writing' unless socket_open?

  if message.is_a?(ActiveCypher::Bolt::Messaging::Run)
    dump '→ RUN', " #{message.fields[0]} #{message.fields[2].inspect}" # query & metadata
  end
  # 1. Pack the message into a temporary buffer
  message_io = StringIO.new(+'', 'wb')
  writer = MessageWriter.new(message_io)
  writer.write(message)
  message_bytes = message_io.string
  message_size = message_bytes.bytesize

  # Debug output if a label was provided
  dump(debug_label, message_bytes) if debug_label

  # 2. Write the chunk header and data
  chunk_header = [message_size].pack('n')
  write_raw(chunk_header)
  write_raw(message_bytes)
  write_raw("\x00\x00") # Chunk terminator

  # Ensure everything is sent
  @socket.flush
rescue StandardError => e
  close
  raise ProtocolError, "Failed to write message: #{e.message}"
end

#write_raw(bytes) ⇒ Object

Note:

Because sometimes you just want to feel close to the metal.

Writes raw bytes directly to the socket.

Parameters:

  • bytes (String)

    the bytes to write

Raises:



125
126
127
128
129
130
131
132
# File 'lib/active_cypher/bolt/connection.rb', line 125

def write_raw(bytes)
  raise ConnectionError, 'Socket not open for writing' unless socket_open?

  @socket.write(bytes) # Async::IO::Socket yields if blocked.
rescue IOError, Errno::EPIPE => e
  close
  raise ConnectionError, "Connection lost during raw write: #{e.message}"
end

#write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Synchronously execute a write transaction.



484
485
486
# File 'lib/active_cypher/bolt/connection.rb', line 484

def write_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).write_transaction(db: db, timeout: timeout, metadata: , &)
end