Class: ActiveCypher::ConnectionAdapters::AbstractBoltAdapter

Inherits:
AbstractAdapter
  • Object
show all
Includes:
Instrumentation
Defined in:
lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb

Overview

Abstract adapter for Bolt-based graph databases. Concrete subclasses must provide protocol_handler_class, validate_connection, and execute_cypher. It’s like ActiveRecord::ConnectionAdapter, but for weirdos like me who use graph databases.

Direct Known Subclasses

MemgraphAdapter, Neo4jAdapter

Instance Attribute Summary collapse

Attributes inherited from AbstractAdapter

#config

Instance Method Summary collapse

Methods included from Instrumentation

#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?

Methods inherited from AbstractAdapter

#begin_transaction, #commit_transaction, #hydrate_record, #initialize, #inspect, #prepare_params, #process_records, #reconnect, #rollback_transaction

Constructor Details

This class inherits a constructor from ActiveCypher::ConnectionAdapters::AbstractAdapter

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 14

def connection
  @connection
end

#driverObject (readonly)

Returns the value of attribute driver.



14
15
16
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 14

def driver
  @driver
end

Instance Method Details

#active?Boolean Also known as: connected?

Connection health check. If this returns false, you’re probably in trouble.

Returns:

  • (Boolean)


82
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 82

def active? = @connection&.connected?

#async_with_session(**kw) {|session| ... } ⇒ Async::Task

Asynchronously yields a Session from the connection pool. Each call acquires its own connection, making it safe for concurrent fibers.

Yield Parameters:

Returns:

  • (Async::Task)

    A task that resolves to the block’s result



111
112
113
114
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 111

def async_with_session(**kw, &block)
  connect
  @driver.async_with_session(**kw, &block)
end

#connectObject

Establish a connection if not already active. This includes auth token prep, URI parsing, and quiet suffering.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 26

def connect
  return true if active?

  instrument_connection(:connect, config) do
    # Determine host and port from config
    host, port = if config[:uri]
                   # Legacy URI format
                   uri = URI(config[:uri])
                   [uri.host, uri.port || 7687]
                 else
                   # New URL format via ConnectionUrlResolver
                   [config[:host] || 'localhost', config[:port] || 7687]
                 end

    # Prepare auth token
    auth = if config[:username]
             { scheme: 'basic', principal: config[:username], credentials: config[:password] }
           else
             { scheme: 'none' }
           end

    # Get SSL connection params
    ssl_params = if config[:url]
                   resolver = ActiveCypher::ConnectionUrlResolver.new(config[:url])
                   resolver.ssl_connection_params
                 else
                   {
                     secure: config[:ssl] ? true : false,
                     verify_cert: config[:ssc] ? false : true
                   }
                 end

    # Create the driver with connection pool for concurrent operations
    @driver = Bolt::Driver.new(
      uri: "bolt://#{host}:#{port}",
      adapter: self,
      auth_token: auth,
      pool_size: config.fetch(:pool_size, 10),
      secure: ssl_params[:secure],
      verify_cert: ssl_params[:verify_cert]
    )

    # Also create a single connection for backwards compatibility
    # This connection is used for simple synchronous operations
    @connection = Bolt::Connection.new(
      host, port, self,
      auth_token: auth,
      timeout_seconds: config.fetch(:timeout, 15),
      **ssl_params
    )
    @connection.connect
    validate_connection
  end
end

#convert_access_mode(mode) ⇒ Object

Convert access mode to database-specific format



132
133
134
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 132

def convert_access_mode(mode)
  mode.to_s # Default implementation
end

#create_protocol_handler(connection) ⇒ Object

Create a protocol handler for the connection



152
153
154
155
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 152

def create_protocol_handler(connection)
  protocol_handler_class.new(connection)
  # Return handler for connection to store
end

#disconnectObject

Clean disconnection. Resets the internal state.



86
87
88
89
90
91
92
93
94
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 86

def disconnect
  instrument_connection(:disconnect) do
    @driver&.close
    @driver = nil
    @connection&.close
    @connection = nil
    true
  end
end

#ensure_schema_migration_constraintObject

Ensure schema migration constraint exists for tracking migrations. Override in subclasses for database-specific syntax.



138
139
140
141
142
143
144
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 138

def ensure_schema_migration_constraint
  execute_cypher(<<~CYPHER, {}, 'SchemaMigration')
    CREATE CONSTRAINT graph_schema_migration IF NOT EXISTS
    FOR (m:SchemaMigration)
    REQUIRE m.version IS UNIQUE
  CYPHER
end

#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object

Prepare transaction metadata with database-specific attributes



147
148
149
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 147

def (, _db, _access_mode)
   # Default implementation
end

#raw_connectionObject

Returns the raw Bolt connection object This is useful for accessing low-level connection methods like read_transaction, write_transaction, etc. NOTE: For concurrent async operations, use with_session or async_with_session instead.



20
21
22
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 20

def raw_connection
  @connection
end

#reset!Boolean

Reset the connection state by sending a RESET message. This clears any pending work and returns the connection to a clean state. Useful for error recovery or connection pooling.

Returns:

  • (Boolean)

    true if reset succeeded, false otherwise



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 162

def reset!
  return false unless active?

  instrument_connection(:reset, config) do
    # Use Sync for efficient synchronous execution within async context
    result = nil
    error = nil

    begin
      result = Sync do
        begin
          # Try to execute a simple query first
          session = Bolt::Session.new(@connection)
          session.run('RETURN 1 AS check', {})
          session.close
          true
        rescue StandardError => e
          # Query failed, need to reset the connection
          logger.debug { "Connection needs reset: #{e.message}" }

          # Send RESET message directly
          begin
            @connection.write_message(Bolt::Messaging::Reset.new)
            response = @connection.read_message
            logger.debug { "Reset response: #{response.class}" }
            response.is_a?(Bolt::Messaging::Success)
          rescue StandardError => reset_error
            logger.error { "Reset failed: #{reset_error.message}" }
            false
          end
        end
      end
    rescue StandardError => e
      error = e
    end

    raise error if error

    result
  end
rescue StandardError => e
  # This is madness!
  logger.error { "Failed to reset connection: #{e.message}" }
  false
end

#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object

Runs a Cypher query via Bolt session. Automatically handles connect, logs query, cleans up session. Very adult.



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 118

def run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write)
  connect
  logger.debug { "[#{context}] #{cypher} #{params.inspect}" }

  instrument_query(cypher, params, context: context, metadata: { db: db, access_mode: access_mode }) do
    session = Bolt::Session.new(connection, database: db)
    result  = session.run(cypher, prepare_params(params), mode: access_mode)
    rows    = result.respond_to?(:to_a) ? result.to_a : result
    session.close
    rows
  end
end

#with_session(**kw) {|session| ... } ⇒ Object

Yields a Session from the connection pool. Safe for concurrent use. Each call acquires its own connection from the pool.

Yield Parameters:

Returns:

  • (Object)

    The result of the block



101
102
103
104
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 101

def with_session(**kw, &block)
  connect
  @driver.with_session(**kw, &block)
end