Class: ActiveCypher::ConnectionAdapters::AbstractBoltAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- ActiveCypher::ConnectionAdapters::AbstractBoltAdapter
- Includes:
- Instrumentation
- Defined in:
- lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb
Overview
Abstract adapter for Bolt-based graph databases. Concrete subclasses must provide protocol_handler_class, validate_connection, and execute_cypher. It’s like ActiveRecord::ConnectionAdapter, but for weirdos like me who use graph databases.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#driver ⇒ Object
readonly
Returns the value of attribute driver.
Attributes inherited from AbstractAdapter
Instance Method Summary collapse
-
#active? ⇒ Boolean
(also: #connected?)
Connection health check.
-
#async_with_session(**kw) {|session| ... } ⇒ Async::Task
Asynchronously yields a Session from the connection pool.
-
#connect ⇒ Object
Establish a connection if not already active.
-
#convert_access_mode(mode) ⇒ Object
Convert access mode to database-specific format.
-
#create_protocol_handler(connection) ⇒ Object
Create a protocol handler for the connection.
-
#disconnect ⇒ Object
Clean disconnection.
-
#ensure_schema_migration_constraint ⇒ Object
Ensure schema migration constraint exists for tracking migrations.
-
#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object
Prepare transaction metadata with database-specific attributes.
-
#raw_connection ⇒ Object
Returns the raw Bolt connection object This is useful for accessing low-level connection methods like read_transaction, write_transaction, etc.
-
#reset! ⇒ Boolean
Reset the connection state by sending a RESET message.
-
#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object
Runs a Cypher query via Bolt session.
-
#with_session(**kw) {|session| ... } ⇒ Object
Yields a Session from the connection pool.
Methods included from Instrumentation
#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?
Methods inherited from AbstractAdapter
#begin_transaction, #commit_transaction, #hydrate_record, #initialize, #inspect, #prepare_params, #process_records, #reconnect, #rollback_transaction
Constructor Details
This class inherits a constructor from ActiveCypher::ConnectionAdapters::AbstractAdapter
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
14 15 16 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 14 def connection @connection end |
#driver ⇒ Object (readonly)
Returns the value of attribute driver.
14 15 16 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 14 def driver @driver end |
Instance Method Details
#active? ⇒ Boolean Also known as: connected?
Connection health check. If this returns false, you’re probably in trouble.
82 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 82 def active? = @connection&.connected? |
#async_with_session(**kw) {|session| ... } ⇒ Async::Task
Asynchronously yields a Session from the connection pool. Each call acquires its own connection, making it safe for concurrent fibers.
111 112 113 114 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 111 def async_with_session(**kw, &block) connect @driver.async_with_session(**kw, &block) end |
#connect ⇒ Object
Establish a connection if not already active. This includes auth token prep, URI parsing, and quiet suffering.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 26 def connect return true if active? instrument_connection(:connect, config) do # Determine host and port from config host, port = if config[:uri] # Legacy URI format uri = URI(config[:uri]) [uri.host, uri.port || 7687] else # New URL format via ConnectionUrlResolver [config[:host] || 'localhost', config[:port] || 7687] end # Prepare auth token auth = if config[:username] { scheme: 'basic', principal: config[:username], credentials: config[:password] } else { scheme: 'none' } end # Get SSL connection params ssl_params = if config[:url] resolver = ActiveCypher::ConnectionUrlResolver.new(config[:url]) resolver.ssl_connection_params else { secure: config[:ssl] ? true : false, verify_cert: config[:ssc] ? false : true } end # Create the driver with connection pool for concurrent operations @driver = Bolt::Driver.new( uri: "bolt://#{host}:#{port}", adapter: self, auth_token: auth, pool_size: config.fetch(:pool_size, 10), secure: ssl_params[:secure], verify_cert: ssl_params[:verify_cert] ) # Also create a single connection for backwards compatibility # This connection is used for simple synchronous operations @connection = Bolt::Connection.new( host, port, self, auth_token: auth, timeout_seconds: config.fetch(:timeout, 15), **ssl_params ) @connection.connect validate_connection end end |
#convert_access_mode(mode) ⇒ Object
Convert access mode to database-specific format
132 133 134 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 132 def convert_access_mode(mode) mode.to_s # Default implementation end |
#create_protocol_handler(connection) ⇒ Object
Create a protocol handler for the connection
152 153 154 155 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 152 def create_protocol_handler(connection) protocol_handler_class.new(connection) # Return handler for connection to store end |
#disconnect ⇒ Object
Clean disconnection. Resets the internal state.
86 87 88 89 90 91 92 93 94 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 86 def disconnect instrument_connection(:disconnect) do @driver&.close @driver = nil @connection&.close @connection = nil true end end |
#ensure_schema_migration_constraint ⇒ Object
Ensure schema migration constraint exists for tracking migrations. Override in subclasses for database-specific syntax.
138 139 140 141 142 143 144 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 138 def ensure_schema_migration_constraint execute_cypher(<<~CYPHER, {}, 'SchemaMigration') CREATE CONSTRAINT graph_schema_migration IF NOT EXISTS FOR (m:SchemaMigration) REQUIRE m.version IS UNIQUE CYPHER end |
#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object
Prepare transaction metadata with database-specific attributes
147 148 149 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 147 def (, _db, _access_mode) # Default implementation end |
#raw_connection ⇒ Object
Returns the raw Bolt connection object This is useful for accessing low-level connection methods like read_transaction, write_transaction, etc. NOTE: For concurrent async operations, use with_session or async_with_session instead.
20 21 22 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 20 def raw_connection @connection end |
#reset! ⇒ Boolean
Reset the connection state by sending a RESET message. This clears any pending work and returns the connection to a clean state. Useful for error recovery or connection pooling.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 162 def reset! return false unless active? instrument_connection(:reset, config) do # Use Sync for efficient synchronous execution within async context result = nil error = nil begin result = Sync do begin # Try to execute a simple query first session = Bolt::Session.new(@connection) session.run('RETURN 1 AS check', {}) session.close true rescue StandardError => e # Query failed, need to reset the connection logger.debug { "Connection needs reset: #{e.}" } # Send RESET message directly begin @connection.(Bolt::Messaging::Reset.new) response = @connection. logger.debug { "Reset response: #{response.class}" } response.is_a?(Bolt::Messaging::Success) rescue StandardError => reset_error logger.error { "Reset failed: #{reset_error.}" } false end end end rescue StandardError => e error = e end raise error if error result end rescue StandardError => e # This is madness! logger.error { "Failed to reset connection: #{e.}" } false end |
#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object
Runs a Cypher query via Bolt session. Automatically handles connect, logs query, cleans up session. Very adult.
118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 118 def run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) connect logger.debug { "[#{context}] #{cypher} #{params.inspect}" } instrument_query(cypher, params, context: context, metadata: { db: db, access_mode: access_mode }) do session = Bolt::Session.new(connection, database: db) result = session.run(cypher, prepare_params(params), mode: access_mode) rows = result.respond_to?(:to_a) ? result.to_a : result session.close rows end end |
#with_session(**kw) {|session| ... } ⇒ Object
Yields a Session from the connection pool. Safe for concurrent use. Each call acquires its own connection from the pool.
101 102 103 104 |
# File 'lib/active_cypher/connection_adapters/abstract_bolt_adapter.rb', line 101 def with_session(**kw, &block) connect @driver.with_session(**kw, &block) end |