Class: VectorMCP::Transport::SSE
- Inherits:
-
Object
- Object
- VectorMCP::Transport::SSE
- Defined in:
- lib/vector_mcp/transport/sse.rb,
lib/vector_mcp/transport/sse/puma_config.rb,
lib/vector_mcp/transport/sse/stream_manager.rb,
lib/vector_mcp/transport/sse/message_handler.rb,
lib/vector_mcp/transport/sse/client_connection.rb
Overview
Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE) for server-to-client messages and HTTP POST for client-to-server messages. This transport uses Puma as the HTTP server with Ruby threading for concurrency.
It provides two main HTTP endpoints:
-
SSE Endpoint (‘<path_prefix>/sse`): Clients connect here via GET to establish an SSE stream. The server sends an initial `event: endpoint` with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent as `event: message`.
-
Message Endpoint (‘<path_prefix>/message`): Clients POST JSON-RPC messages here. The `session_id` (obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client’s established SSE stream.
Defined Under Namespace
Classes: ClientConnection, MessageHandler, PumaConfig, StreamManager
Instance Attribute Summary collapse
-
#host ⇒ String
readonly
The hostname or IP address the server will bind to.
-
#logger ⇒ Logger
readonly
The logger instance, shared with the server.
-
#path_prefix ⇒ String
readonly
The base URL path for MCP endpoints (e.g., “/mcp”).
-
#port ⇒ Integer
readonly
The port number the server will listen on.
-
#server ⇒ VectorMCP::Server
readonly
The server instance this transport is bound to.
-
#session_manager ⇒ Object
readonly
Returns the value of attribute session_manager.
Instance Method Summary collapse
-
#broadcast_notification(method, params = nil) ⇒ void
Broadcasts a JSON-RPC notification to all currently connected client sessions.
-
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a ‘build_rack_app` helper.
-
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests.
-
#cleanup_clients ⇒ Object
Cleans up all client connections (legacy mode).
-
#initialize(server, options = {}) ⇒ SSE
constructor
Initializes a new SSE transport.
-
#run ⇒ void
Starts the SSE transport, creating a shared session and launching the Puma server.
-
#send_notification(method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to the first available client session.
-
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
-
#stop ⇒ Object
Stops the transport and cleans up resources.
Constructor Details
#initialize(server, options = {}) ⇒ SSE
Initializes a new SSE transport.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/vector_mcp/transport/sse.rb', line 56 def initialize(server, = {}) @server = server @logger = server.logger @host = [:host] || "localhost" @port = [:port] || 8000 prefix = [:path_prefix] || "/mcp" @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}" @path_prefix = @path_prefix.delete_suffix("/") @sse_path = "#{@path_prefix}/sse" @message_path = "#{@path_prefix}/message" # Thread-safe client storage using concurrent-ruby (legacy approach) @clients = Concurrent::Hash.new @session = nil # Global session for this transport instance, initialized in run # Initialize session manager for secure multi-client session isolation (default behavior) # Legacy shared session behavior can be enabled with disable_session_manager: true (deprecated) if [:disable_session_manager] logger.warn("[DEPRECATED] SSE shared session mode is deprecated and poses security risks in multi-client scenarios. " \ "Consider removing disable_session_manager: true to use secure per-client sessions.") @session_manager = nil else @session_manager = SseSessionManager.new(self) end @puma_server = nil @running = false logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" } end |
Instance Attribute Details
#host ⇒ String (readonly)
The hostname or IP address the server will bind to.
44 45 46 |
# File 'lib/vector_mcp/transport/sse.rb', line 44 def host @host end |
#logger ⇒ Logger (readonly)
The logger instance, shared with the server.
44 45 46 |
# File 'lib/vector_mcp/transport/sse.rb', line 44 def logger @logger end |
#path_prefix ⇒ String (readonly)
The base URL path for MCP endpoints (e.g., “/mcp”).
44 45 46 |
# File 'lib/vector_mcp/transport/sse.rb', line 44 def path_prefix @path_prefix end |
#port ⇒ Integer (readonly)
The port number the server will listen on.
44 45 46 |
# File 'lib/vector_mcp/transport/sse.rb', line 44 def port @port end |
#server ⇒ VectorMCP::Server (readonly)
The server instance this transport is bound to.
44 45 46 |
# File 'lib/vector_mcp/transport/sse.rb', line 44 def server @server end |
#session_manager ⇒ Object (readonly)
Returns the value of attribute session_manager.
45 46 47 |
# File 'lib/vector_mcp/transport/sse.rb', line 45 def session_manager @session_manager end |
Instance Method Details
#broadcast_notification(method, params = nil) ⇒ void
This method returns an undefined value.
Broadcasts a JSON-RPC notification to all currently connected client sessions.
163 164 165 166 167 168 169 170 171 |
# File 'lib/vector_mcp/transport/sse.rb', line 163 def broadcast_notification(method, params = nil) # Broadcasting notification to clients = { jsonrpc: "2.0", method: method } [:params] = params if params @clients.each_value do |client_conn| StreamManager.(client_conn, ) end end |
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a ‘build_rack_app` helper. Since the transport itself is a Rack app (defines `#call`), it returns `self`.
178 179 180 181 |
# File 'lib/vector_mcp/transport/sse.rb', line 178 def build_rack_app(session = nil) @session = session if session self end |
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/vector_mcp/transport/sse.rb', line 107 def call(env) start_time = Time.now path = env["PATH_INFO"] http_method = env["REQUEST_METHOD"] logger.info "Received #{http_method} request for #{path}" status, headers, body = route_request(path, env) log_response(http_method, path, start_time, status) [status, headers, body] rescue StandardError => e handle_call_error(http_method, path, e) end |
#cleanup_clients ⇒ Object
Cleans up all client connections (legacy mode)
196 197 198 199 200 201 202 203 204 |
# File 'lib/vector_mcp/transport/sse.rb', line 196 def cleanup_clients logger.info("Cleaning up #{@clients.size} client connection(s)") @clients.each_value do |client_conn| client_conn.close if client_conn.respond_to?(:close) rescue StandardError => e logger.warn("Error closing client connection: #{e.}") end @clients.clear end |
#run ⇒ void
This method returns an undefined value.
Starts the SSE transport, creating a shared session and launching the Puma server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).
91 92 93 94 95 96 97 98 |
# File 'lib/vector_mcp/transport/sse.rb', line 91 def run logger.info("Starting server with Puma SSE transport on #{@host}:#{@port}") # Only create shared session if explicitly using legacy mode (deprecated) create_session unless @session_manager start_puma_server rescue StandardError => e handle_fatal_error(e) end |
#send_notification(method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to the first available client session. If no clients are connected, returns false.
129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/vector_mcp/transport/sse.rb', line 129 def send_notification(method, params = nil) return false if @clients.empty? # Send to first available client first_client = @clients.values.first return false unless first_client = { jsonrpc: "2.0", method: method } [:params] = params if params StreamManager.(first_client, ) end |
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
148 149 150 151 152 153 154 155 156 |
# File 'lib/vector_mcp/transport/sse.rb', line 148 def send_notification_to_session(session_id, method, params = nil) = { jsonrpc: "2.0", method: method } [:params] = params if params client_conn = @clients[session_id] return false unless client_conn StreamManager.(client_conn, ) end |
#stop ⇒ Object
Stops the transport and cleans up resources
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/vector_mcp/transport/sse.rb', line 184 def stop @running = false if @session_manager @session_manager.cleanup_all_sessions else cleanup_clients end @puma_server&.stop logger.info("SSE transport stopped") end |