Class: ActiveCypher::ConnectionAdapters::MemgraphAdapter

Inherits:
AbstractBoltAdapter show all
Defined in:
lib/active_cypher/connection_adapters/memgraph_adapter.rb

Defined Under Namespace

Modules: Persistence Classes: ProtocolHandler

Constant Summary collapse

ID_FUNCTION =

Use id() for Memgraph instead of elementId()

'id'

Instance Attribute Summary

Attributes inherited from AbstractBoltAdapter

#connection, #driver

Attributes inherited from AbstractAdapter

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractBoltAdapter

#active?, #async_with_session, #connect, #create_protocol_handler, #disconnect, #raw_connection, #reset!, #with_session

Methods included from Instrumentation

#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?

Methods inherited from AbstractAdapter

#active?, #begin_transaction, #commit_transaction, #connect, #disconnect, #initialize, #inspect, #process_records, #reconnect, #rollback_transaction

Constructor Details

This class inherits a constructor from ActiveCypher::ConnectionAdapters::AbstractAdapter

Class Method Details

.id_functionObject



57
58
59
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 57

def self.id_function
  'id'
end

.node_id_equals_value(alias_name, value) ⇒ Object



49
50
51
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 49

def self.node_id_equals_value(alias_name, value)
  "id(#{alias_name}) = #{value}"
end

.node_id_where(alias_name, param_name = nil) ⇒ Object

Additional helper methods for nodes



41
42
43
44
45
46
47
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 41

def self.node_id_where(alias_name, param_name = nil)
  if param_name
    "id(#{alias_name}) = $#{param_name}"
  else
    "id(#{alias_name})"
  end
end

.return_idObject



36
37
38
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 36

def self.return_id
  'id(r) AS rid'
end

.return_node_id(alias_name, as_name = 'internal_id') ⇒ Object



53
54
55
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 53

def self.return_node_id(alias_name, as_name = 'internal_id')
  "id(#{alias_name}) AS #{as_name}"
end

.with_direct_id(id) ⇒ Object

Helper methods for Cypher query generation with IDs



20
21
22
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 20

def self.with_direct_id(id)
  "id(r) = #{id}"
end

.with_direct_node_ids(a_id, b_id) ⇒ Object



28
29
30
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 28

def self.with_direct_node_ids(a_id, b_id)
  "id(p) = #{a_id} AND id(h) = #{b_id}"
end

.with_param_idObject



24
25
26
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 24

def self.with_param_id
  'id(r) = $id'
end

.with_param_node_idsObject



32
33
34
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 32

def self.with_param_node_ids
  'id(p) = $from_id AND id(h) = $to_id'
end

Instance Method Details

#convert_access_mode(mode) ⇒ Object

Implement database-specific methods for Memgraph



191
192
193
194
195
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 191

def convert_access_mode(mode)
  # Memgraph doesn't distinguish between read/write modes
  # but we'll keep the conversion here for consistency
  mode.to_s
end

#ensure_schema_migration_constraintObject

Memgraph uses different constraint syntax than Neo4j



67
68
69
70
71
72
73
74
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 67

def ensure_schema_migration_constraint
  execute_ddl(<<~CYPHER)
    CREATE CONSTRAINT ON (m:SchemaMigration) ASSERT m.version IS UNIQUE
  CYPHER
rescue ActiveCypher::QueryError => e
  # Ignore if constraint already exists
  raise unless e.message.include?('already exists')
end

#execute_cypher(cypher, params = {}, ctx = 'Query') ⇒ Object

Memgraph defaults to **implicit auto‑commit** transactions so we simply run the Cypher and return the rows.



181
182
183
184
185
186
187
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 181

def execute_cypher(cypher, params = {}, ctx = 'Query')
  # Replace adapter-aware placeholder with Memgraph's id function
  # Because Memgraph insists on being different and using id() instead of elementId()
  cypher = cypher.gsub('__NODE_ID__', 'id')
  rows = run(cypher, params, context: ctx)
  process_records(rows)
end

#execute_ddl(cypher, params = {}) ⇒ Object

Execute DDL statements (constraints, indexes) without explicit transaction Memgraph requires auto-commit for schema manipulation



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 78

def execute_ddl(cypher, params = {})
  connect
  logger.debug { "[DDL] #{cypher}" }

  Sync do
    # Send RUN directly without BEGIN/COMMIT wrapper
    connection.write_message(Bolt::Messaging::Run.new(cypher, params, {}))
    connection.write_message(Bolt::Messaging::Pull.new({ n: -1 }))

    # Read responses
    run_response = connection.read_message
    unless run_response.is_a?(Bolt::Messaging::Success)
      # Read any remaining messages to clear connection state
      begin
        connection.read_message
      rescue StandardError
        nil
      end
      # Send RESET to clear connection state
      connection.write_message(Bolt::Messaging::Reset.new)
      begin
        connection.read_message
      rescue StandardError
        nil
      end
      raise QueryError, "DDL failed for: #{cypher.inspect}\nError: #{run_response.fields.first}"
    end

    pull_response = connection.read_message
    pull_response
  end
end

#hydrate_record(record, node_alias) ⇒ Hash

Hydrates attributes from a Memgraph record

Parameters:

  • record (Hash)

    The raw record from Memgraph

  • node_alias (Symbol)

    The alias used for the node in the query

Returns:

  • (Hash)

    The hydrated attributes



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 207

def hydrate_record(record, node_alias)
  attrs = {}
  node_data = record[node_alias] || record[node_alias.to_s]

  if node_data.is_a?(Array) && node_data.length >= 2
    properties_container = node_data[1]
    if properties_container.is_a?(Array) && properties_container.length >= 3
      properties = properties_container[2]
      properties.each { |k, v| attrs[k.to_sym] = v } if properties.is_a?(Hash)
    end
  elsif node_data.is_a?(Hash)
    node_data.each { |k, v| attrs[k.to_sym] = v }
  elsif node_data.respond_to?(:properties)
    attrs = node_data.properties.symbolize_keys
  end

  attrs[:internal_id] = record[:internal_id] || record['internal_id']
  attrs
end

#id_handlerObject

Return self as id_handler for compatibility with tests



62
63
64
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 62

def id_handler
  self.class
end

#prepare_tx_metadata(metadata, _db, _access_mode) ⇒ Object



197
198
199
200
201
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 197

def (, _db, _access_mode)
  # Memgraph doesn't use db or access_mode in metadata
  # but we'll ensure metadata is returned with compact
  .compact
end

#run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write) ⇒ Object

Override run to execute queries using auto-commit mode. Memgraph auto-commits each query, so we send RUN + PULL directly without BEGIN/COMMIT wrapper. This avoids transaction state issues.



114
115
116
117
118
119
120
121
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 114

def run(cypher, params = {}, context: 'Query', db: nil, access_mode: :write)
  connect
  logger.debug { "[#{context}] #{cypher} #{params.inspect}" }

  instrument_query(cypher, params, context: context, metadata: { db: db, access_mode: access_mode }) do
    run_auto_commit(cypher, prepare_params(params))
  end
end

#run_auto_commit(cypher, params = {}) ⇒ Array<Hash>

Execute a query in auto-commit mode (no explicit transaction). Sends RUN + PULL directly to the connection.

Parameters:

  • cypher (String)

    The Cypher query

  • params (Hash) (defaults to: {})

    Query parameters

Returns:

  • (Array<Hash>)

    The result rows



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 129

def run_auto_commit(cypher, params = {})
  Sync do
    # Send RUN message
    run_meta = {}
    connection.write_message(Bolt::Messaging::Run.new(cypher, params, run_meta))

    # Read RUN response
    run_response = connection.read_message

    case run_response
    when Bolt::Messaging::Success
      # Send PULL to get results
      connection.write_message(Bolt::Messaging::Pull.new({ n: -1 }))

      # Collect records
      rows = []
      fields = run_response.['fields'] || []

      loop do
        msg = connection.read_message
        case msg
        when Bolt::Messaging::Record
          # Convert record values to hash with field names
          row = fields.zip(msg.values).to_h
          rows << row
        when Bolt::Messaging::Success
          # End of results
          break
        when Bolt::Messaging::Failure
          code = msg.['code']
          message = msg.['message']
          connection.reset!
          raise QueryError, "Query failed: #{code} - #{message}"
        else
          raise ProtocolError, "Unexpected response during PULL: #{msg.class}"
        end
      end

      rows
    when Bolt::Messaging::Failure
      code = run_response.['code']
      message = run_response.['message']
      connection.reset!
      raise QueryError, "Query failed: #{code} - #{message}"
    else
      raise ProtocolError, "Unexpected response to RUN: #{run_response.class}"
    end
  end
end

#schema_catalogObject



11
12
13
14
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 11

def schema_catalog
  rows = run('SHOW SCHEMA')
  parse_schema(rows)
end

#vendorObject



9
# File 'lib/active_cypher/connection_adapters/memgraph_adapter.rb', line 9

def vendor = :memgraph