Class: ActiveCypher::Bolt::MessageReader

Inherits:
Object
  • Object
show all
Defined in:
lib/active_cypher/bolt/message_reader.rb

Overview

Handles decoding chunked Packstream data into Bolt messages for Bolt v5.x.

Constant Summary collapse

MAX_CHUNK_SIZE =

Maximum size for a single chunk (unsigned 16-bit int)

65_535
READ_TIMEOUT =

Seconds to wait for a read operation

15

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ MessageReader

Returns a new instance of MessageReader.



12
13
14
15
# File 'lib/active_cypher/bolt/message_reader.rb', line 12

def initialize(io)
  @io = io
  @buffer = StringIO.new(+'', 'rb+') # Internal buffer for chunked data
end

Instance Method Details

#read_messageMessaging::Message

Reads and decodes the next Bolt message from the stream. Handles Bolt’s chunking mechanism.

Returns:

Raises:

  • (ProtocolError)

    If decoding fails or an unknown message type is received.

  • (ConnectionError)

    If the connection is lost during reading.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/active_cypher/bolt/message_reader.rb', line 23

def read_message
  message_bytes = read_message_chunks
  return nil if message_bytes.nil? || message_bytes.empty?

  unpacker = Packstream::Unpacker.new(StringIO.new(message_bytes, 'rb'))
  signature, fields = unpacker.unpack

  klass = find_message_class(signature) or
    raise ProtocolError, "Unknown message signature 0x#{signature.to_s(16)}"

  klass.new(*fields)
rescue EOFError => e
  raise ConnectionError, "Connection closed while reading message: #{e.message}"
rescue ConnectionError
  raise
rescue StandardError => e
  raise ProtocolError, "Failed to decode message: #{e.class} - #{e.message}"
end