Class: VectorMCP::Transport::HttpStream
- Inherits:
-
Object
- Object
- VectorMCP::Transport::HttpStream
- Defined in:
- lib/vector_mcp/transport/http_stream.rb,
lib/vector_mcp/transport/http_stream/event_store.rb,
lib/vector_mcp/transport/http_stream/stream_handler.rb,
lib/vector_mcp/transport/http_stream/session_manager.rb
Overview
Implements the Model Context Protocol transport over HTTP with streaming support according to the MCP specification for Streamable HTTP transport.
This transport supports:
-
Client-to-server communication via HTTP POST
-
Optional server-to-client streaming via Server-Sent Events (SSE)
-
Session management with Mcp-Session-Id headers
-
Resumable connections with event IDs and Last-Event-ID support
-
Bidirectional communication patterns
Endpoints:
-
POST /mcp - Client sends JSON-RPC requests
-
GET /mcp - Optional SSE streaming for server-initiated messages
-
DELETE /mcp - Session termination
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: EventStore, SessionManager, StreamHandler
Constant Summary collapse
- DEFAULT_HOST =
Default configuration values
"localhost"
- DEFAULT_PORT =
8000
- DEFAULT_PATH_PREFIX =
"/mcp"
- DEFAULT_SESSION_TIMEOUT =
5 minutes
300
- DEFAULT_EVENT_RETENTION =
Keep last 100 events for resumability
100
- DEFAULT_REQUEST_TIMEOUT =
Default timeout for server-initiated requests
30
Instance Attribute Summary collapse
-
#event_store ⇒ HttpStream::EventStore
readonly
private
Provides access to event store for internal components.
-
#host ⇒ String
readonly
The hostname or IP address the server will bind to.
-
#logger ⇒ Logger
readonly
The logger instance, shared with the server.
-
#path_prefix ⇒ String
readonly
The base URL path for MCP endpoints.
-
#port ⇒ Integer
readonly
The port number the server will listen on.
-
#server ⇒ VectorMCP::Server
readonly
The server instance this transport is bound to.
-
#session_manager ⇒ HttpStream::SessionManager
readonly
private
Provides access to session manager for internal components.
-
#stream_handler ⇒ HttpStream::StreamHandler
readonly
private
Provides access to stream handler for internal components.
Instance Method Summary collapse
-
#broadcast_notification(method, params = nil) ⇒ Integer
Broadcasts a notification to all active sessions.
-
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests (Rack interface).
-
#initialize(server, options = {}) ⇒ HttpStream
constructor
Initializes a new HTTP Stream transport.
-
#run ⇒ void
Starts the HTTP Stream transport.
-
#send_notification(method, params = nil) ⇒ Boolean
Sends a notification to the first available session.
-
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a notification to a specific session.
-
#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request compatible with Session expectations.
-
#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request to a specific session and waits for a response.
-
#stop ⇒ void
Stops the transport and cleans up resources.
Constructor Details
#initialize(server, options = {}) ⇒ HttpStream
Initializes a new HTTP Stream transport.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 66 def initialize(server, = {}) @server = server @logger = server.logger initialize_configuration() initialize_components initialize_request_tracking initialize_object_pools initialize_server_state logger.info { "HttpStream transport initialized: #{@host}:#{@port}#{@path_prefix}" } end |
Instance Attribute Details
#event_store ⇒ HttpStream::EventStore (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to event store for internal components.
225 226 227 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 225 def event_store @event_store end |
#host ⇒ String (readonly)
The hostname or IP address the server will bind to
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def host @host end |
#logger ⇒ Logger (readonly)
The logger instance, shared with the server
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def logger @logger end |
#path_prefix ⇒ String (readonly)
The base URL path for MCP endpoints
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def path_prefix @path_prefix end |
#port ⇒ Integer (readonly)
The port number the server will listen on
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def port @port end |
#server ⇒ VectorMCP::Server (readonly)
The server instance this transport is bound to
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def server @server end |
#session_manager ⇒ HttpStream::SessionManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to session manager for internal components.
219 220 221 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 219 def session_manager @session_manager end |
#stream_handler ⇒ HttpStream::StreamHandler (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to stream handler for internal components.
231 232 233 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 231 def stream_handler @stream_handler end |
Instance Method Details
#broadcast_notification(method, params = nil) ⇒ Integer
Broadcasts a notification to all active sessions.
141 142 143 144 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 141 def broadcast_notification(method, params = nil) = build_notification(method, params) @session_manager.() end |
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests (Rack interface). Routes requests to appropriate handlers based on path and method.
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 94 def call(env) start_time = Time.now path = env["PATH_INFO"] method = env["REQUEST_METHOD"] # Processing HTTP request response = route_request(path, method, env) log_request_completion(method, path, start_time, response[0]) response rescue StandardError => e handle_request_error(method, path, e) end |
#run ⇒ void
This method returns an undefined value.
Starts the HTTP Stream transport. This method will block until the server is stopped.
83 84 85 86 87 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 83 def run start_puma_server rescue StandardError => e handle_fatal_error(e) end |
#send_notification(method, params = nil) ⇒ Boolean
Sends a notification to the first available session.
113 114 115 116 117 118 119 120 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 113 def send_notification(method, params = nil) # Find the first available session first_session = find_first_session return false unless first_session = build_notification(method, params) @stream_handler.(first_session, ) end |
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a notification to a specific session.
128 129 130 131 132 133 134 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 128 def send_notification_to_session(session_id, method, params = nil) session = @session_manager.get_session(session_id) return false unless session = build_notification(method, params) @stream_handler.(session, ) end |
#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request compatible with Session expectations. This method will block until a response is received or the timeout is reached. For HTTP transport, this requires finding an appropriate session with streaming connection.
156 157 158 159 160 161 162 163 164 165 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 156 def send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty? # Find the first session with streaming connection # In HTTP transport, we need an active streaming connection to send server-initiated requests streaming_session = find_streaming_session raise ArgumentError, "No streaming session available for server-initiated requests" unless streaming_session send_request_to_session(streaming_session.id, method, params, timeout: timeout) end |
#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request to a specific session and waits for a response. This method will block until a response is received or the timeout is reached.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 177 def send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty? raise ArgumentError, "Session ID cannot be blank" if session_id.to_s.strip.empty? session = @session_manager.get_session(session_id) raise ArgumentError, "Session not found: #{session_id}" unless session raise ArgumentError, "Session must have streaming connection for server-initiated requests" unless session.streaming? request_id = generate_request_id request_payload = { jsonrpc: "2.0", id: request_id, method: method } request_payload[:params] = params if params setup_request_tracking(request_id) # Sending request to session # Send request via existing streaming connection unless @stream_handler.(session, request_payload) cleanup_request_tracking(request_id) raise VectorMCP::SamplingError, "Failed to send request to session #{session_id}" end response = wait_for_response(request_id, method, timeout) process_response(response, request_id, method) end |
#stop ⇒ void
This method returns an undefined value.
Stops the transport and cleans up resources.
206 207 208 209 210 211 212 213 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 206 def stop logger.info { "Stopping HttpStream transport" } @running = false cleanup_all_pending_requests @session_manager.cleanup_all_sessions @puma_server&.stop logger.info { "HttpStream transport stopped" } end |