Class: ActiveCypher::ConnectionAdapters::AbstractBoltAdapter

Inherits:
AbstractAdapter
  • Object
show all
Includes:
Instrumentation
Defined in:
lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb

Overview

Abstract adapter for Bolt-based graph databases. Concrete subclasses must provide protocol_handler_class, validate_connection, and execute_cypher. It’s like ActiveRecord::ConnectionAdapter, but for weirdos like me who use graph databases.

Direct Known Subclasses

MemgraphAdapter, Neo4jAdapter

Instance Attribute Summary collapse

Attributes inherited from AbstractAdapter

#config

Instance Method Summary collapse

Methods included from Instrumentation

#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?

Methods inherited from AbstractAdapter

#begin_transaction, #commit_transaction, #hydrate_record, #initialize, #inspect, #prepare_params, #process_records, #reconnect, #rollback_transaction

Constructor Details

This class inherits a constructor from ActiveCypher::ConnectionAdapters::AbstractAdapter

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 14

def connection
  @connection
end

Instance Method Details

#active?Boolean Also known as: connected?

Connection health check. If this returns false, you’re probably in trouble.

Returns:

  • (Boolean)


69
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 69

def active? = @connection&.connected?

#connectObject

Establish a connection if not already active. This includes auth token prep, URI parsing, and quiet suffering.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 25

def connect
  return true if active?

  instrument_connection(:connect, config) do
    # Determine host and port from config
    host, port = if config[:uri]
                   # Legacy URI format
                   uri = URI(config[:uri])
                   [uri.host, uri.port || 7687]
                 else
                   # New URL format via ConnectionUrlResolver
                   [config[:host] || 'localhost', config[:port] || 7687]
                 end

    # Prepare auth token
    auth = if config[:username]
             { scheme: 'basic', principal: config[:username], credentials: config[:password] }
           else
             { scheme: 'none' }
           end

    # Get SSL connection params
    ssl_params = if config[:url]
                   resolver = ActiveCypher::ConnectionUrlResolver.new(config[:url])
                   resolver.ssl_connection_params
                 else
                   {
                     secure: config[:ssl] ? true : false,
                     verify_cert: config[:ssc] ? false : true
                   }
                 end

    @connection = Bolt::Connection.new(
      host, port, self,
      auth_token: auth,
      timeout_seconds: config.fetch(:timeout, 15),
      **ssl_params
    )
    @connection.connect
    validate_connection
  end
end

#convert_access_mode(mode) ⇒ Object

Convert access mode to database-specific format



97
98
99
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 97

def convert_access_mode(mode)
  mode.to_s # Default implementation
end

#create_protocol_handler(connection) ⇒ Object

Create a protocol handler for the connection



107
108
109
110
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 107

def create_protocol_handler(connection)
  protocol_handler_class.new(connection)
  # Return handler for connection to store
end

#disconnectObject

Clean disconnection. Resets the internal state.



73
74
75
76
77
78
79
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 73

def disconnect
  instrument_connection(:disconnect) do
    @connection.close if @connection
    @connection = nil
    true
  end
end

#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object

Prepare transaction metadata with database-specific attributes



102
103
104
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 102

def (, _db, _access_mode)
   # Default implementation
end

#raw_connectionObject

Returns the raw Bolt connection object This is useful for accessing low-level connection methods like read_transaction, write_transaction, async_read_transaction, etc.



19
20
21
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 19

def raw_connection
  @connection
end

#reset!Boolean

Reset the connection state by sending a RESET message. This clears any pending work and returns the connection to a clean state. Useful for error recovery or connection pooling.

Returns:

  • (Boolean)

    true if reset succeeded, false otherwise



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 117

def reset!
  return false unless active?

  instrument_connection(:reset, config) do
    # Wrap in async to handle the connection reset properly
    result = nil
    error = nil

    Async do
      begin
        # Try to execute a simple query first
        session = Bolt::Session.new(@connection)
        session.run('RETURN 1 AS check', {})
        session.close
        result = true
      rescue StandardError => e
        # Query failed, need to reset the connection
        logger.debug { "Connection needs reset: #{e.message}" }

        # Send RESET message directly
        begin
          @connection.write_message(Bolt::Messaging::Reset.new)
          response = @connection.read_message
          result = response.is_a?(Bolt::Messaging::Success)
          logger.debug { "Reset response: #{response.class}" }
        rescue StandardError => reset_error
          logger.error { "Reset failed: #{reset_error.message}" }
          result = false
        end
      end
    rescue StandardError => e
      error = e
    end.wait

    raise error if error

    result
  end
rescue StandardError => e
  # This is madness!
  logger.error { "Failed to reset connection: #{e.message}" }
  false
end

#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object

Runs a Cypher query via Bolt session. Automatically handles connect, logs query, cleans up session. Very adult.



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 83

def run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write)
  connect
  logger.debug { "[#{context}] #{cypher} #{params.inspect}" }

  instrument_query(cypher, params, context: context, metadata: { db: db, access_mode: access_mode }) do
    session = Bolt::Session.new(connection, database: db)
    result  = session.run(cypher, prepare_params(params), mode: access_mode)
    rows    = result.respond_to?(:to_a) ? result.to_a : result
    session.close
    rows
  end
end