Class: VectorMCP::Transport::HttpStream::StreamHandler Private

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/http_stream/stream_handler.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Handles Server-Sent Events streaming for HTTP transport.

Manages:

  • SSE connection lifecycle

  • Event streaming with resumability

  • Last-Event-ID header processing

  • Connection health monitoring

Defined Under Namespace

Classes: StreamingConnection

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport) ⇒ StreamHandler

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes a new stream handler.

Parameters:

  • transport (HttpStream)

    The parent transport instance



35
36
37
38
39
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 35

def initialize(transport)
  @transport = transport
  @logger = transport.logger
  @active_connections = Concurrent::Hash.new
end

Instance Attribute Details

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18

def logger
  @logger
end

#transportObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18

def transport
  @transport
end

Instance Method Details

#active_connection_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gets the number of active streaming connections.

Returns:

  • (Integer)

    Number of active connections



92
93
94
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 92

def active_connection_count
  @active_connections.size
end

#cleanup_all_connectionsvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Cleans up all active connections.



99
100
101
102
103
104
105
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 99

def cleanup_all_connections
  logger.info("Cleaning up all streaming connections: #{@active_connections.size}")

  @active_connections.each_value(&:close)

  @active_connections.clear
end

#handle_streaming_request(env, session) ⇒ Array

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handles a streaming request (GET request for SSE).

Parameters:

Returns:

  • (Array)

    Rack response triplet for SSE



46
47
48
49
50
51
52
53
54
55
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 46

def handle_streaming_request(env, session)
  last_event_id = extract_last_event_id(env)

  logger.info("Starting SSE stream for session #{session.id}")

  headers = build_sse_headers
  body = create_sse_stream(session, last_event_id)

  [200, headers, body]
end

#send_message_to_session(session, message) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sends a message to a specific session.

Parameters:

Returns:

  • (Boolean)

    True if message was sent successfully



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 62

def send_message_to_session(session, message)
  return false unless session.streaming?

  connection = @active_connections[session.id]
  return false unless connection && !connection.closed?

  begin
    # Store event for resumability
    event_data = message.to_json
    event_id = @transport.event_store.store_event(event_data, "message")

    # Send via SSE
    sse_event = format_sse_event(event_data, "message", event_id)
    connection.yielder << sse_event

    logger.debug("Message sent to session #{session.id}")

    true
  rescue StandardError => e
    logger.error("Error sending message to session #{session.id}: #{e.message}")

    # Mark connection as closed and clean up
    cleanup_connection(session)
    false
  end
end