Class: VectorMCP::Transport::HttpStream::StreamHandler Private
- Inherits:
-
Object
- Object
- VectorMCP::Transport::HttpStream::StreamHandler
- Defined in:
- lib/vector_mcp/transport/http_stream/stream_handler.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Handles Server-Sent Events streaming for HTTP transport.
Manages:
-
SSE connection lifecycle
-
Event streaming with resumability
-
Last-Event-ID header processing
-
Connection health monitoring
Defined Under Namespace
Classes: StreamingConnection
Instance Attribute Summary collapse
- #logger ⇒ Object readonly private
- #transport ⇒ Object readonly private
Instance Method Summary collapse
-
#active_connection_count ⇒ Integer
private
Gets the number of active streaming connections.
-
#cleanup_all_connections ⇒ void
private
Cleans up all active connections.
-
#handle_streaming_request(env, session) ⇒ Array
private
Handles a streaming request (GET request for SSE).
-
#initialize(transport) ⇒ StreamHandler
constructor
private
Initializes a new stream handler.
-
#send_message_to_session(session, message) ⇒ Boolean
private
Sends a message to a specific session.
Constructor Details
#initialize(transport) ⇒ StreamHandler
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initializes a new stream handler.
35 36 37 38 39 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 35 def initialize(transport) @transport = transport @logger = transport.logger @active_connections = Concurrent::Hash.new end |
Instance Attribute Details
#logger ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18 def logger @logger end |
#transport ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18 def transport @transport end |
Instance Method Details
#active_connection_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gets the number of active streaming connections.
92 93 94 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 92 def active_connection_count @active_connections.size end |
#cleanup_all_connections ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cleans up all active connections.
99 100 101 102 103 104 105 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 99 def cleanup_all_connections logger.info("Cleaning up all streaming connections: #{@active_connections.size}") @active_connections.each_value(&:close) @active_connections.clear end |
#handle_streaming_request(env, session) ⇒ Array
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handles a streaming request (GET request for SSE).
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 46 def handle_streaming_request(env, session) last_event_id = extract_last_event_id(env) logger.info("Starting SSE stream for session #{session.id}") headers = build_sse_headers body = create_sse_stream(session, last_event_id) [200, headers, body] end |
#send_message_to_session(session, message) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sends a message to a specific session.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 62 def (session, ) return false unless session.streaming? connection = @active_connections[session.id] return false unless connection && !connection.closed? begin # Store event for resumability event_data = .to_json event_id = @transport.event_store.store_event(event_data, "message") # Send via SSE sse_event = format_sse_event(event_data, "message", event_id) connection.yielder << sse_event logger.debug("Message sent to session #{session.id}") true rescue StandardError => e logger.error("Error sending message to session #{session.id}: #{e.message}") # Mark connection as closed and clean up cleanup_connection(session) false end end |