Class: ActiveCypher::ConnectionAdapters::MemgraphAdapter
- Inherits:
-
AbstractBoltAdapter
- Object
- AbstractAdapter
- AbstractBoltAdapter
- ActiveCypher::ConnectionAdapters::MemgraphAdapter
- Defined in:
- lib/active_cypher/connection_adapters/memgraph_adapter.rb
Defined Under Namespace
Modules: Persistence Classes: ProtocolHandler
Constant Summary collapse
- ID_FUNCTION =
Use id() for Memgraph instead of elementId()
'id'
Instance Attribute Summary
Attributes inherited from AbstractBoltAdapter
Attributes inherited from AbstractAdapter
Class Method Summary collapse
- .id_function ⇒ Object
- .node_id_equals_value(alias_name, value) ⇒ Object
-
.node_id_where(alias_name, param_name = nil) ⇒ Object
Additional helper methods for nodes.
- .return_id ⇒ Object
- .return_node_id(alias_name, as_name = 'internal_id') ⇒ Object
-
.with_direct_id(id) ⇒ Object
Helper methods for Cypher query generation with IDs.
- .with_direct_node_ids(a_id, b_id) ⇒ Object
- .with_param_id ⇒ Object
- .with_param_node_ids ⇒ Object
Instance Method Summary collapse
-
#convert_access_mode(mode) ⇒ Object
Implement database-specific methods for Memgraph.
-
#ensure_schema_migration_constraint ⇒ Object
Memgraph uses different constraint syntax than Neo4j.
-
#execute_cypher(cypher, params = {}, ctx = 'Query') ⇒ Object
Memgraph defaults to **implicit auto‑commit** transactions so we simply run the Cypher and return the rows.
-
#execute_ddl(cypher, params = {}) ⇒ Object
Execute DDL statements (constraints, indexes) without explicit transaction Memgraph requires auto-commit for schema manipulation.
-
#hydrate_record(record, node_alias) ⇒ Hash
Hydrates attributes from a Memgraph record.
-
#id_handler ⇒ Object
Return self as id_handler for compatibility with tests.
- #prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object
-
#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object
Override run to execute queries using auto-commit mode.
-
#run_auto_commit(cypher, params = {}) ⇒ Array<Hash>
Execute a query in auto-commit mode (no explicit transaction).
- #schema_catalog ⇒ Object
- #vendor ⇒ Object
Methods inherited from AbstractBoltAdapter
#active?, #async_with_session, #connect, #create_protocol_handler, #disconnect, #raw_connection, #reset!, #with_session
Methods included from Instrumentation
#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?
Methods inherited from AbstractAdapter
#active?, #begin_transaction, #commit_transaction, #connect, #disconnect, #initialize, #inspect, #process_records, #reconnect, #rollback_transaction
Constructor Details
This class inherits a constructor from ActiveCypher::ConnectionAdapters::AbstractAdapter
Class Method Details
.id_function ⇒ Object
57 58 59 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 57 def self.id_function 'id' end |
.node_id_equals_value(alias_name, value) ⇒ Object
49 50 51 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 49 def self.node_id_equals_value(alias_name, value) "id(#{alias_name}) = #{value}" end |
.node_id_where(alias_name, param_name = nil) ⇒ Object
Additional helper methods for nodes
41 42 43 44 45 46 47 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 41 def self.node_id_where(alias_name, param_name = nil) if param_name "id(#{alias_name}) = $#{param_name}" else "id(#{alias_name})" end end |
.return_id ⇒ Object
36 37 38 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 36 def self.return_id 'id(r) AS rid' end |
.return_node_id(alias_name, as_name = 'internal_id') ⇒ Object
53 54 55 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 53 def self.return_node_id(alias_name, as_name = 'internal_id') "id(#{alias_name}) AS #{as_name}" end |
.with_direct_id(id) ⇒ Object
Helper methods for Cypher query generation with IDs
20 21 22 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 20 def self.with_direct_id(id) "id(r) = #{id}" end |
.with_direct_node_ids(a_id, b_id) ⇒ Object
28 29 30 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 28 def self.with_direct_node_ids(a_id, b_id) "id(p) = #{a_id} AND id(h) = #{b_id}" end |
.with_param_id ⇒ Object
24 25 26 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 24 def self.with_param_id 'id(r) = $id' end |
.with_param_node_ids ⇒ Object
32 33 34 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 32 def self.with_param_node_ids 'id(p) = $from_id AND id(h) = $to_id' end |
Instance Method Details
#convert_access_mode(mode) ⇒ Object
Implement database-specific methods for Memgraph
191 192 193 194 195 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 191 def convert_access_mode(mode) # Memgraph doesn't distinguish between read/write modes # but we'll keep the conversion here for consistency mode.to_s end |
#ensure_schema_migration_constraint ⇒ Object
Memgraph uses different constraint syntax than Neo4j
67 68 69 70 71 72 73 74 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 67 def ensure_schema_migration_constraint execute_ddl(<<~CYPHER) CREATE CONSTRAINT ON (m:SchemaMigration) ASSERT m.version IS UNIQUE CYPHER rescue ActiveCypher::QueryError => e # Ignore if constraint already exists raise unless e..include?('already exists') end |
#execute_cypher(cypher, params = {}, ctx = 'Query') ⇒ Object
Memgraph defaults to **implicit auto‑commit** transactions so we simply run the Cypher and return the rows.
181 182 183 184 185 186 187 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 181 def execute_cypher(cypher, params = {}, ctx = 'Query') # Replace adapter-aware placeholder with Memgraph's id function # Because Memgraph insists on being different and using id() instead of elementId() cypher = cypher.gsub('__NODE_ID__', 'id') rows = run(cypher, params, context: ctx) process_records(rows) end |
#execute_ddl(cypher, params = {}) ⇒ Object
Execute DDL statements (constraints, indexes) without explicit transaction Memgraph requires auto-commit for schema manipulation
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 78 def execute_ddl(cypher, params = {}) connect logger.debug { "[DDL] #{cypher}" } Sync do # Send RUN directly without BEGIN/COMMIT wrapper connection.(Bolt::Messaging::Run.new(cypher, params, {})) connection.(Bolt::Messaging::Pull.new({ n: -1 })) # Read responses run_response = connection. unless run_response.is_a?(Bolt::Messaging::Success) # Read any remaining messages to clear connection state begin connection. rescue StandardError nil end # Send RESET to clear connection state connection.(Bolt::Messaging::Reset.new) begin connection. rescue StandardError nil end raise QueryError, "DDL failed for: #{cypher.inspect}\nError: #{run_response.fields.first}" end pull_response = connection. pull_response end end |
#hydrate_record(record, node_alias) ⇒ Hash
Hydrates attributes from a Memgraph record
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 207 def hydrate_record(record, node_alias) attrs = {} node_data = record[node_alias] || record[node_alias.to_s] if node_data.is_a?(Array) && node_data.length >= 2 properties_container = node_data[1] if properties_container.is_a?(Array) && properties_container.length >= 3 properties = properties_container[2] properties.each { |k, v| attrs[k.to_sym] = v } if properties.is_a?(Hash) end elsif node_data.is_a?(Hash) node_data.each { |k, v| attrs[k.to_sym] = v } elsif node_data.respond_to?(:properties) attrs = node_data.properties.symbolize_keys end attrs[:internal_id] = record[:internal_id] || record['internal_id'] attrs end |
#id_handler ⇒ Object
Return self as id_handler for compatibility with tests
62 63 64 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 62 def id_handler self.class end |
#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object
197 198 199 200 201 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 197 def (, _db, _access_mode) # Memgraph doesn't use db or access_mode in metadata # but we'll ensure metadata is returned with compact .compact end |
#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object
Override run to execute queries using auto-commit mode. Memgraph auto-commits each query, so we send RUN + PULL directly without BEGIN/COMMIT wrapper. This avoids transaction state issues.
114 115 116 117 118 119 120 121 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 114 def run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) connect logger.debug { "[#{context}] #{cypher} #{params.inspect}" } instrument_query(cypher, params, context: context, metadata: { db: db, access_mode: access_mode }) do run_auto_commit(cypher, prepare_params(params)) end end |
#run_auto_commit(cypher, params = {}) ⇒ Array<Hash>
Execute a query in auto-commit mode (no explicit transaction). Sends RUN + PULL directly to the connection.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 129 def run_auto_commit(cypher, params = {}) Sync do # Send RUN message = {} connection.(Bolt::Messaging::Run.new(cypher, params, )) # Read RUN response run_response = connection. case run_response when Bolt::Messaging::Success # Send PULL to get results connection.(Bolt::Messaging::Pull.new({ n: -1 })) # Collect records rows = [] fields = run_response.['fields'] || [] loop do msg = connection. case msg when Bolt::Messaging::Record # Convert record values to hash with field names row = fields.zip(msg.values).to_h rows << row when Bolt::Messaging::Success # End of results break when Bolt::Messaging::Failure code = msg.['code'] = msg.['message'] connection.reset! raise QueryError, "Query failed: #{code} - #{}" else raise ProtocolError, "Unexpected response during PULL: #{msg.class}" end end rows when Bolt::Messaging::Failure code = run_response.['code'] = run_response.['message'] connection.reset! raise QueryError, "Query failed: #{code} - #{}" else raise ProtocolError, "Unexpected response to RUN: #{run_response.class}" end end end |
#schema_catalog ⇒ Object
11 12 13 14 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 11 def schema_catalog rows = run('SHOW SCHEMA') parse_schema(rows) end |
#vendor ⇒ Object
9 |
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 9 def vendor = :memgraph |