Class: ActiveCypher::Bolt::Connection

Inherits:
Object
  • Object
show all
Includes:
VersionEncoding
Defined in:
lib/active_cypher/bolt/connection.rb

Constant Summary collapse

SUPPORTED_VERSIONS =
[5.8, 5.2].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, adapter, auth_token:, timeout_seconds: 15, secure: false, verify_cert: true) ⇒ Connection

Note:

The ceremony required to instantiate a connection. Because nothing says “enterprise” like 8 arguments.

Initializes a new Bolt connection.

Parameters:

  • host (String)

    the database host

  • port (Integer)

    the database port

  • adapter (Object)

    the adapter using this connection

  • auth_token (Hash)

    authentication token

  • timeout_seconds (Integer) (defaults to: 15)

    connection timeout in seconds

  • secure (Boolean) (defaults to: false)

    whether to use SSL

  • verify_cert (Boolean) (defaults to: true)

    whether to verify SSL certificates



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/active_cypher/bolt/connection.rb', line 53

def initialize(host, port, adapter,
               auth_token:, timeout_seconds: 15,
               secure: false, verify_cert: true)
  @host               = host
  @port               = port
  @auth_token         = auth_token
  @timeout_seconds    = timeout_seconds
  @secure             = secure
  @verify_cert        = verify_cert
  @adapter            = adapter

  @socket             = nil
  @connected          = false
  @protocol_version   = nil
  @server_agent       = nil
  @connection_id      = nil
  @reconnect_attempts = 0
  @max_reconnect_attempts = 3
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def adapter
  @adapter
end

#connection_idObject (readonly)

Returns the value of attribute connection_id.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def connection_id
  @connection_id
end

#hostObject (readonly)

Returns the value of attribute host.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def port
  @port
end

#protocol_handlerObject (readonly)

Access to the protocol handler



440
441
442
# File 'lib/active_cypher/bolt/connection.rb', line 440

def protocol_handler
  @protocol_handler
end

#protocol_versionObject (readonly)

Returns the value of attribute protocol_version.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def protocol_version
  @protocol_version
end

#server_agentObject (readonly)

Returns the value of attribute server_agent.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def server_agent
  @server_agent
end

#socketObject (readonly)

Returns the value of attribute socket.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def socket
  @socket
end

#timeout_secondsObject (readonly)

Returns the value of attribute timeout_seconds.



17
18
19
# File 'lib/active_cypher/bolt/connection.rb', line 17

def timeout_seconds
  @timeout_seconds
end

Instance Method Details

#async_read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Asynchronously execute a read transaction.



484
485
486
# File 'lib/active_cypher/bolt/connection.rb', line 484

def async_read_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).async_read_transaction(db: db, timeout: timeout, metadata: , &)
end

#async_write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Asynchronously execute a write transaction.



489
490
491
# File 'lib/active_cypher/bolt/connection.rb', line 489

def async_write_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).async_write_transaction(db: db, timeout: timeout, metadata: , &)
end

#closeObject

Note:

The digital equivalent of ghosting.

Closes the TCP connection if it’s open.



135
136
137
138
139
140
141
# File 'lib/active_cypher/bolt/connection.rb', line 135

def close
  @socket.close if connected?
rescue IOError
ensure
  @socket = nil
  @connected = false
end

#concurrencyInteger

A single Bolt socket is strictly single‑plex:

Returns:

  • (Integer)

    always 1, because concurrency is for people with more optimistic protocols.



219
# File 'lib/active_cypher/bolt/connection.rb', line 219

def concurrency = 1

#connectObject

Note:

Attempts to connect, or at least to feel something.

Establishes the connection to the database.

Raises:



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/active_cypher/bolt/connection.rb', line 80

def connect
  return if connected?

  # Using a variable to track errors instead of re-raising inside the Async block
  error = nil

  begin
    Async do |task|
      task.with_timeout(@timeout_seconds) do
        @socket = open_socket
        perform_handshake
        @connected          = true
        @reconnect_attempts = 0
      end
    rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH,
           SocketError, OpenSSL::SSL::SSLError => e
      # Catch connection errors inside the task
      close
      # Store the error instead of raising
      error = ConnectionError.new("Failed to connect to #{host}:#{port} - #{e.message}")
    rescue StandardError => e
      # Catch any other errors inside the task
      close
      # Store the error instead of raising
      error = ConnectionError.new("Error during connection: #{e.message}")
    end.wait
  rescue Async::TimeoutError => e
    error = ConnectionError.new("Connection timed out to #{host}:#{port} - #{e.message}")
  rescue StandardError => e
    close
    error = ConnectionError.new("Connection error: #{e.message}")
  end

  # After the Async block is complete, raise the error if one occurred
  raise error if error
end

#connected?Boolean

Note:

Checks if we’re still pretending to be connected.

Checks if the connection is open and the socket is alive.

Returns:

  • (Boolean)


147
# File 'lib/active_cypher/bolt/connection.rb', line 147

def connected? = @connected && socket_open?

#database_infoHash

Returns comprehensive database information.

Returns:

  • (Hash)

    database information including version, health, and system details



544
545
546
547
548
549
550
551
552
553
554
555
# File 'lib/active_cypher/bolt/connection.rb', line 544

def database_info
  info = version.dup
  health = health_check

  info.merge({
               healthy: health[:healthy],
               response_time_ms: health[:response_time_ms],
               server_agent: @server_agent,
               connection_id: @connection_id,
               protocol_version: @protocol_version
             })
end

#database_typeSymbol

Returns the database type detected from server agent.

Returns:

  • (Symbol)

    :neo4j, :memgraph, or :unknown



510
511
512
# File 'lib/active_cypher/bolt/connection.rb', line 510

def database_type
  version[:database_type]
end

#dump(label, bytes) ⇒ Object

Debug output for those who enjoy hexadecimal existentialism.

Parameters:

  • label (String)
  • bytes (String)


212
213
214
# File 'lib/active_cypher/bolt/connection.rb', line 212

def dump(label, bytes)
  puts "[DEBUG] #{label.ljust(18)}: #{bytes.bytes.map { |b| b.to_s(16).rjust(2, '0') }.join(' ')}" if ENV['DEBUG']
end

#handle_hello_failure(metadata) ⇒ Object

Note:

The more common outcome.

Handles a FAILURE response to HELLO.

Raises:



377
378
379
380
381
382
# File 'lib/active_cypher/bolt/connection.rb', line 377

def handle_hello_failure()
  code    = ['code']
  message = ['message']
  close
  raise ConnectionError, "Authentication failed: #{code} - #{message}"
end

#handle_hello_success(metadata) ⇒ Object

Note:

The rarest of all outcomes.

Handles a SUCCESS response to HELLO.



369
370
371
372
# File 'lib/active_cypher/bolt/connection.rb', line 369

def handle_hello_success()
  @connection_id = ['connection_id']
  @server_agent  = ['server']
end

#health_checkHash

Note:

Uses different queries based on detected database type

Performs a health check using database-appropriate queries.

Returns:

  • (Hash)

    health check result with :healthy, :response_time_ms, :details



518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
# File 'lib/active_cypher/bolt/connection.rb', line 518

def health_check
  return { healthy: false, response_time_ms: nil, details: 'Not connected' } unless connected?

  result = nil

  begin
    Async do
      result = case database_type
               when :neo4j
                 perform_neo4j_health_check
               when :memgraph
                 perform_memgraph_health_check
               else
                 perform_generic_health_check
               end
    end.wait

    result
  rescue ConnectionError, ProtocolError => e
    { healthy: false, response_time_ms: nil, details: "Health check failed: #{e.message}" }
  end
end

#inspectObject

Override inspect to redact sensitive information



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/active_cypher/bolt/connection.rb', line 21

def inspect
  filtered_auth = ActiveCypher::Redaction.filter_hash(@auth_token)

  attributes = {
    host: @host.inspect,
    port: @port.inspect,
    auth_token: filtered_auth.inspect,
    timeout_seconds: @timeout_seconds.inspect,
    secure: @secure.inspect,
    verify_cert: @verify_cert.inspect,
    connected: @connected.inspect,
    protocol_version: @protocol_version.inspect,
    server_agent: @server_agent.inspect,
    connection_id: @connection_id.inspect
  }

  "#<#{self.class.name}:0x#{object_id.to_s(16)} #{attributes.map { |k, v| "@#{k}=#{v}" }.join(', ')}>"
end

#perform_handshakeObject

Note:

The digital equivalent of a secret handshake, but with more bytes and less trust.

Performs the Bolt handshake sequence.

Raises:



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/active_cypher/bolt/connection.rb', line 250

def perform_handshake
  # Bolt Magic Preamble (0x6060B017)
  magic = "\x60\x60\xB0\x17"
  dump('Magic', magic)
  write_raw(magic)

  # Proposed Bolt Versions (ordered by preference)
  # Encoded as 4‑byte big‑endian integers
  proposed_versions = (SUPPORTED_VERSIONS + [0, 0])[0, 4]
  versions = proposed_versions.map { |v| encode_version(v) }.join
  dump('Sending versions', versions)
  write_raw(versions)

  # Read agreed version (4 bytes)
  agreed_version_bytes = read_raw(4)
  dump('Agreed version', agreed_version_bytes)
  @protocol_version = decode_version(agreed_version_bytes)

  # Validate agreed version
  unless SUPPORTED_VERSIONS.include?(@protocol_version)
    close
    raise ProtocolError,
          "Server only supports unsupported Bolt protocol (#{@protocol_version}). This client requires one of: #{SUPPORTED_VERSIONS.join(', ')}"
  end

  # Send HELLO message
  send_hello

  # Read response (should be SUCCESS or FAILURE)
  response = begin
    msg = read_message
    msg
  rescue EOFError => e
    raise ConnectionError, "Server closed connection: #{e.message}"
  end

  case response
  when Messaging::Success
    handle_hello_success(response.)

    # if auth credentials were provided, send LOGON
    send_logon if @auth_token && @auth_token[:scheme] == 'basic'

    # Let adapter create protocol handler instead of directly instantiating
    @protocol_handler = @adapter.create_protocol_handler(self)
  when Messaging::Failure
    handle_hello_failure(response.)
  else
    close
    raise ProtocolError, "Unexpected response during handshake: #{response.class}"
  end
rescue ConnectionError, ProtocolError => e
  close
  raise e
rescue StandardError => e
  close
  raise ConnectionError, "Handshake error: #{e.message}"
end

#read(length) ⇒ Object

Note:

Reading from the void, hoping something meaningful comes back.

Reads data from the socket.

Parameters:

  • length (Integer)

    number of bytes to read

Raises:



199
200
201
202
203
204
205
206
# File 'lib/active_cypher/bolt/connection.rb', line 199

def read(length)
  raise ConnectionError, 'Not connected' unless connected?

  @socket.read_exactly(length)
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError => e
  close
  raise ConnectionError, "Connection lost during read: #{e.message}"
end

#read_messageObject

Note:

Reads from the abyss and hopes for a message, not a void.

Reads a Bolt message using the MessageReader.

Returns:

  • (Object)

    the Bolt message

Raises:



426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/active_cypher/bolt/connection.rb', line 426

def read_message
  raise ConnectionError, 'Socket not open for reading' unless socket_open?

  reader = MessageReader.new(@socket)
  reader.read_message
rescue ConnectionError, ProtocolError => e
  close
  raise e
rescue EOFError => e
  close
  raise ConnectionError, "Connection closed unexpectedly: #{e.message}"
end

#read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Synchronously execute a read transaction.



474
475
476
# File 'lib/active_cypher/bolt/connection.rb', line 474

def read_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).read_transaction(db: db, timeout: timeout, metadata: , &)
end

#reconnectBoolean

Note:

Attempts to reconnect, because hope springs eternal.

Attempts to reconnect if the connection is lost.

Returns:

  • (Boolean)

    True if reconnection was successful, false otherwise



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/active_cypher/bolt/connection.rb', line 153

def reconnect
  return true if connected?

  @reconnect_attempts += 1
  if @reconnect_attempts <= @max_reconnect_attempts
    close
    begin
      connect
      # Reset reconnect counter on successful connection
      @reconnect_attempts = 0
      true
    rescue ConnectionError => e
      # Log the error but don't raise
      puts "Reconnection attempt #{@reconnect_attempts}/#{@max_reconnect_attempts} failed: #{e.message}" if ENV['DEBUG']
      # Sleep to avoid hammering the server
      sleep(0.1 * @reconnect_attempts)
      false
    end
  else
    # Reset the counter after max attempts to allow future reconnects
    @reconnect_attempts = 0
    false
  end
end

#reset!Boolean

Note:

For when you want to pretend nothing ever happened.

Resets the connection state.

Returns:

  • (Boolean)

    true if reset succeeded, false otherwise



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/active_cypher/bolt/connection.rb', line 446

def reset!
  return false unless connected?

  begin
    write_message(ActiveCypher::Bolt::Messaging::Reset.new)
    msg = read_message # should be Messaging::Success
    msg.is_a?(ActiveCypher::Bolt::Messaging::Success)
  rescue ConnectionError, ProtocolError
    # If reset fails, close the connection
    close
    false
  rescue StandardError
    # For any other errors, also close the connection
    close
    false
  end
end

#reusable?Boolean

Re‑use only if still alive:

Returns:

  • (Boolean)


224
# File 'lib/active_cypher/bolt/connection.rb', line 224

def reusable?   = connected?

#send_helloObject

Note:

Because every protocol needs a little small talk before the pain begins.

Sends the HELLO message.



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/active_cypher/bolt/connection.rb', line 312

def send_hello
  user_agent = "ActiveCypher::Bolt/#{ActiveCypher::VERSION} (Ruby/#{RUBY_VERSION})"
  platform = RUBY_DESCRIPTION.split[1..].join(' ') # Gets everything after "ruby" in RUBY_DESCRIPTION
   = {
    'user_agent' => user_agent,
    'notifications_minimum_severity' => 'WARNING',
    'bolt_agent' => {
      'product' => user_agent,
      'platform' => platform,
      'language' => "#{RUBY_PLATFORM}/#{RUBY_VERSION}",
      'language_details' => "#{RUBY_ENGINE} #{RUBY_ENGINE_VERSION}"
    }
  }
  hello_message = Messaging::Hello.new()
  write_message(hello_message, 'HELLO')
end

#send_logonObject

Note:

Because authentication is just another opportunity for disappointment.

Sends the LOGON message.



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/active_cypher/bolt/connection.rb', line 332

def send_logon
  # Get credentials from the connection's auth token
   = {
    'scheme' => @auth_token[:scheme],
    'principal' => @auth_token[:principal],
    'credentials' => @auth_token[:credentials]
  }

  # Create and send LOGON message
  begin
    logon_msg = Messaging::Logon.new()
    write_message(logon_msg, 'LOGON')

    # Read and process response
    logon_response = read_message

    case logon_response
    when Messaging::Success
      true
    when Messaging::Failure
      code = logon_response.['code']
      message = logon_response.['message']
      close
      raise ConnectionError, "Authentication failed during LOGON: #{code} - #{message}"
    else
      close
      raise ProtocolError, "Unexpected response to LOGON: #{logon_response.class}"
    end
  rescue StandardError => e
    close
    raise ConnectionError, "Authentication error: #{e.message}"
  end
end

#server_statusArray<Hash>?

Returns server/cluster status information (when available).

Returns:

  • (Array<Hash>, nil)

    array of server status objects or nil if not supported



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'lib/active_cypher/bolt/connection.rb', line 560

def server_status
  return nil unless connected?

  begin
    case database_type
    when :neo4j
      get_neo4j_server_status
    when :memgraph
      get_memgraph_server_status
    else
      nil
    end
  rescue ConnectionError, ProtocolError
    nil # Gracefully handle unsupported operations
  end
end

#sessionBolt::Session

Note:

Because every connection deserves a second chance.

Returns a fresh Session object that re‑uses this TCP/Bolt socket.

Parameters:

  • **kwargs

    passed to the Session initializer

Returns:



469
470
471
# File 'lib/active_cypher/bolt/connection.rb', line 469

def session(**)
  Bolt::Session.new(self, **)
end

#versionHash

Note:

Extracts version from server_agent captured during handshake

Returns parsed version information from the server agent string.

Returns:

  • (Hash)

    version information with :database_type, :version, :major, :minor, :patch



501
502
503
504
505
# File 'lib/active_cypher/bolt/connection.rb', line 501

def version
  return @version if defined?(@version)

  @version = parse_version_from_server_agent
end

#viable?Boolean

Note:

The database equivalent of “are you still there?”

This method is required by Async::Pool to check if the connection is viable for reuse

Returns:

  • (Boolean)


230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/active_cypher/bolt/connection.rb', line 230

def viable?
  return false unless connected?

  # Use the health check method to determine viability
  health = health_check

  if health[:healthy]
    true
  else
    # If health check failed, close the connection
    close
    false
  end
end

#write(data) ⇒ Object

Note:

Because nothing says “robust” like a method that can explode at any time.

Writes data to the socket.

Parameters:

  • data (String)

    the data to write

Raises:



184
185
186
187
188
189
190
191
# File 'lib/active_cypher/bolt/connection.rb', line 184

def write(data)
  raise ConnectionError, 'Not connected' unless connected?

  @socket.write(data)
rescue Errno::EPIPE, IOError => e
  close
  raise ConnectionError, "Connection lost during write: #{e.message}"
end

#write_message(message, debug_label = nil) ⇒ Object

Note:

Because nothing says “enterprise” like chunked binary messages.

Writes a Bolt message using the MessageWriter, adding Bolt chunking.

Parameters:

  • message (Object)

    the Bolt message to write

  • debug_label (String, nil) (defaults to: nil)

    optional debug label

Raises:



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/active_cypher/bolt/connection.rb', line 391

def write_message(message, debug_label = nil)
  raise ConnectionError, 'Socket not open for writing' unless socket_open?

  if message.is_a?(ActiveCypher::Bolt::Messaging::Run)
    dump '→ RUN', " #{message.fields[0]} #{message.fields[2].inspect}" # query & metadata
  end
  # 1. Pack the message into a temporary buffer
  message_io = StringIO.new(+'', 'wb')
  writer = MessageWriter.new(message_io)
  writer.write(message)
  message_bytes = message_io.string
  message_size = message_bytes.bytesize

  # Debug output if a label was provided
  dump(debug_label, message_bytes) if debug_label

  # 2. Write the chunk header and data
  chunk_header = [message_size].pack('n')
  write_raw(chunk_header)
  write_raw(message_bytes)
  write_raw("\x00\x00") # Chunk terminator

  # Ensure everything is sent
  @socket.flush
rescue StandardError => e
  close
  raise ProtocolError, "Failed to write message: #{e.message}"
end

#write_raw(bytes) ⇒ Object

Note:

Because sometimes you just want to feel close to the metal.

Writes raw bytes directly to the socket.

Parameters:

  • bytes (String)

    the bytes to write

Raises:



123
124
125
126
127
128
129
130
# File 'lib/active_cypher/bolt/connection.rb', line 123

def write_raw(bytes)
  raise ConnectionError, 'Socket not open for writing' unless socket_open?

  @socket.write(bytes) # Async::IO::Socket yields if blocked.
rescue IOError, Errno::EPIPE => e
  close
  raise ConnectionError, "Connection lost during raw write: #{e.message}"
end

#write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object

Synchronously execute a write transaction.



479
480
481
# File 'lib/active_cypher/bolt/connection.rb', line 479

def write_transaction(db: nil, timeout: nil, metadata: nil, &)
  session(database: db).write_transaction(db: db, timeout: timeout, metadata: , &)
end