Class: ActiveCypher::Bolt::Session

Inherits:
Object
  • Object
show all
Includes:
Instrumentation
Defined in:
lib/active_cypher/bolt/session.rb

Overview

A Session is the primary unit of work in the Bolt Protocol. It maintains a connection to the database server and allows running queries.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Instrumentation

#instrument, #instrument_connection, #instrument_query, #instrument_transaction, #sanitize_config, #sanitize_params, #sensitive_key?

Constructor Details

#initialize(connection, database: nil) ⇒ Session

Returns a new instance of Session.



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/active_cypher/bolt/session.rb', line 13

def initialize(connection, database: nil)
  @connection = connection

  # For Memgraph, never set a database (they don't support multiple DBs)
  @database = if connection.adapter.is_a?(ConnectionAdapters::MemgraphAdapter)
                nil
              else
                database
              end

  @current_transaction = nil
  @bookmarks = []
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



11
12
13
# File 'lib/active_cypher/bolt/session.rb', line 11

def connection
  @connection
end

#databaseObject (readonly)

Returns the value of attribute database.



11
12
13
# File 'lib/active_cypher/bolt/session.rb', line 11

def database
  @database
end

Instance Method Details

#async_read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object



159
160
161
# File 'lib/active_cypher/bolt/session.rb', line 159

def async_read_transaction(db: nil, timeout: nil, metadata: nil, &)
  async_run_transaction(:read, db: db, timeout: timeout, metadata: , &)
end

#async_run_transaction(mode = :write, db: nil, timeout: nil, metadata: nil) {|tx| ... } ⇒ Async::Task

Asynchronously execute a block of code within a transaction. This method is asynchronous and will return an Async::Task that will complete when the transaction is finished.

Parameters:

  • mode (Symbol) (defaults to: :write)

    The access mode (:read or :write).

  • db (String) (defaults to: nil)

    The database name to run the transaction against.

  • timeout (Integer) (defaults to: nil)

    Transaction timeout in milliseconds.

  • metadata (Hash) (defaults to: nil)

    Transaction metadata to send to the server.

Yields:

  • (tx)

    The transaction to use for queries.

Returns:

  • (Async::Task)

    A task that will complete with the result of the block.



138
139
140
141
142
143
144
145
# File 'lib/active_cypher/bolt/session.rb', line 138

def async_run_transaction(mode = :write, db: nil, timeout: nil, metadata: nil, &)
  # Ensure we are in an async task, otherwise the behavior is undefined.
  raise 'Cannot run an async transaction outside of an Async task' unless Async::Task.current?

  Async do
    _execute_transaction_block(mode, db, timeout, , &)
  end
end

#async_write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object



155
156
157
# File 'lib/active_cypher/bolt/session.rb', line 155

def async_write_transaction(db: nil, timeout: nil, metadata: nil, &)
  async_run_transaction(:write, db: db, timeout: timeout, metadata: , &)
end

#begin_transaction(db: nil, access_mode: :write, tx_timeout: nil, tx_metadata: nil) ⇒ Transaction

Begin a new transaction.

Parameters:

  • db (String) (defaults to: nil)

    The database name to begin the transaction against.

  • access_mode (Symbol) (defaults to: :write)

    The access mode (:read or :write).

  • tx_timeout (Integer) (defaults to: nil)

    Transaction timeout in milliseconds.

  • tx_metadata (Hash) (defaults to: nil)

    Transaction metadata to send to the server.

Returns:

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/active_cypher/bolt/session.rb', line 58

def begin_transaction(db: nil, access_mode: :write, tx_timeout: nil, tx_metadata: nil)
  raise ConnectionError, 'Already in a transaction' if @current_transaction&.active?

   = { access_mode: access_mode }
  [:db] = db if db
  [:timeout] = tx_timeout if tx_timeout

  instrument_transaction(:begin, nil, metadata: ) do
    # Send BEGIN message with appropriate metadata
    begin_meta = {}
    # For Memgraph, NEVER set a database name - it doesn't support them
    if @connection.adapter.is_a?(ConnectionAdapters::MemgraphAdapter)
      # Explicitly don't set db for Memgraph
      begin_meta['adapter'] = 'memgraph'
      # Force db to nil for Memgraph
      nil
    elsif db || @database
      begin_meta['db'] = db || @database
    end
    begin_meta['mode'] = access_mode == :read ? 'r' : 'w'
    begin_meta['tx_timeout'] = tx_timeout if tx_timeout
    begin_meta['tx_metadata'] =  if 
    begin_meta['bookmarks'] = @bookmarks if @bookmarks&.any?

    begin_msg = Messaging::Begin.new(begin_meta)
    @connection.write_message(begin_msg)

    # Read response to BEGIN
    response = @connection.read_message

    case response
    when Messaging::Success
      # BEGIN succeeded, create a new transaction
      @current_transaction = Transaction.new(self, @bookmarks, response.)
    when Messaging::Failure
      # BEGIN failed
      code = response.['code']
      message = response.['message']
      @connection.reset!
      raise QueryError, "Failed to begin transaction: #{code} - #{message}"
    else
      raise ProtocolError, "Unexpected response to BEGIN: #{response.class}"
    end
  end
end

#closeObject

Close the session and any active transaction.



164
165
166
167
168
169
170
171
172
# File 'lib/active_cypher/bolt/session.rb', line 164

def close
  instrument('session.close') do
    # If there's an active transaction, try to roll it back
    @current_transaction&.rollback if @current_transaction&.active?

    # Mark current transaction as complete
    complete_transaction(@current_transaction) if @current_transaction
  end
end

#complete_transaction(transaction, new_bookmarks = nil) ⇒ Object

Marks a transaction as completed and removes it from the session.

Parameters:

  • transaction (Transaction)

    The transaction to complete.

  • new_bookmarks (Array<String>) (defaults to: nil)

    New bookmarks to update.



108
109
110
111
112
113
# File 'lib/active_cypher/bolt/session.rb', line 108

def complete_transaction(transaction, new_bookmarks = nil)
  return unless transaction == @current_transaction

  @current_transaction = nil
  @bookmarks = new_bookmarks if new_bookmarks
end

#read_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object



151
152
153
# File 'lib/active_cypher/bolt/session.rb', line 151

def read_transaction(db: nil, timeout: nil, metadata: nil, &)
  run_transaction(:read, db: db, timeout: timeout, metadata: , &)
end

#run(query, parameters = {}, mode: :write, db: nil) ⇒ Result

Executes a Cypher query and returns the result.

Parameters:

  • query (String)

    The Cypher query to execute.

  • parameters (Hash) (defaults to: {})

    Parameters for the query.

  • mode (Symbol) (defaults to: :write)

    The access mode (:read or :write).

  • db (String) (defaults to: nil)

    The database name to run the query against.

Returns:

  • (Result)

    The result of the query execution.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/active_cypher/bolt/session.rb', line 34

def run(query, parameters = {}, mode: :write, db: nil)
  # For Memgraph, explicitly set db to nil
  db = nil if @connection.adapter.is_a?(ConnectionAdapters::MemgraphAdapter)

  instrument_query(query, parameters, context: 'Session#run', metadata: { mode: mode, db: db }) do
    if @current_transaction&.active?
      # If we have an active transaction, run the query within it
      @current_transaction.run(query, parameters)
    else
      # Auto-transaction mode: each query gets its own transaction
      run_transaction(mode, db: db) do |tx|
        tx.run(query, parameters)
      end
    end
  end
end

#run_transaction(mode = :write, db: nil, timeout: nil, metadata: nil) {|tx| ... } ⇒ Object

Execute a block of code within a transaction.

Parameters:

  • mode (Symbol) (defaults to: :write)

    The access mode (:read or :write).

  • db (String) (defaults to: nil)

    The database name to run the transaction against.

  • timeout (Integer) (defaults to: nil)

    Transaction timeout in milliseconds.

  • metadata (Hash) (defaults to: nil)

    Transaction metadata to send to the server.

Yields:

  • (tx)

    The transaction to use for queries.

Returns:

  • The result of the block.



123
124
125
126
127
# File 'lib/active_cypher/bolt/session.rb', line 123

def run_transaction(mode = :write, db: nil, timeout: nil, metadata: nil, &)
  Sync do
    _execute_transaction_block(mode, db, timeout, , &)
  end
end

#write_transaction(db: nil, timeout: nil, metadata: nil) ⇒ Object



147
148
149
# File 'lib/active_cypher/bolt/session.rb', line 147

def write_transaction(db: nil, timeout: nil, metadata: nil, &)
  run_transaction(:write, db: db, timeout: timeout, metadata: , &)
end