Class: VectorMCP::Transport::SSE

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/sse.rb,
lib/vector_mcp/transport/sse/puma_config.rb,
lib/vector_mcp/transport/sse/stream_manager.rb,
lib/vector_mcp/transport/sse/message_handler.rb,
lib/vector_mcp/transport/sse/client_connection.rb

Overview

Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE) for server-to-client messages and HTTP POST for client-to-server messages. This transport uses Puma as the HTTP server with Ruby threading for concurrency.

It provides two main HTTP endpoints:

  1. SSE Endpoint (‘<path_prefix>/sse`): Clients connect here via GET to establish an SSE stream. The server sends an initial `event: endpoint` with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent as `event: message`.

  2. Message Endpoint (‘<path_prefix>/message`): Clients POST JSON-RPC messages here. The `session_id` (obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client’s established SSE stream.

Examples:

Basic Usage with a Server

server = VectorMCP::Server.new("my-sse-server")
# ... register tools, resources, prompts ...
transport = VectorMCP::Transport::SSE.new(server, port: 8080)
server.run(transport: transport)

Defined Under Namespace

Classes: ClientConnection, MessageHandler, PumaConfig, StreamManager

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ SSE

Initializes a new SSE transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages.

  • options (Hash) (defaults to: {})

    Configuration options for the transport.

Options Hash (options):

  • :host (String) — default: "localhost"

    The hostname or IP to bind to.

  • :port (Integer) — default: 8000

    The port to listen on.

  • :path_prefix (String) — default: "/mcp"

    The base path for HTTP endpoints.

  • :disable_session_manager (Boolean) — default: false

    DEPRECATED: Whether to disable secure session isolation. When false (default), each client gets isolated sessions. When true, all clients share a global session (security risk).



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/vector_mcp/transport/sse.rb', line 56

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  @host = options[:host] || "localhost"
  @port = options[:port] || 8000
  prefix = options[:path_prefix] || "/mcp"
  @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}"
  @path_prefix = @path_prefix.delete_suffix("/")
  @sse_path = "#{@path_prefix}/sse"
  @message_path = "#{@path_prefix}/message"

  # Thread-safe client storage using concurrent-ruby (legacy approach)
  @clients = Concurrent::Hash.new
  @session = nil # Global session for this transport instance, initialized in run

  # Initialize session manager for secure multi-client session isolation (default behavior)
  # Legacy shared session behavior can be enabled with disable_session_manager: true (deprecated)
  if options[:disable_session_manager]
    logger.warn("[DEPRECATED] SSE shared session mode is deprecated and poses security risks in multi-client scenarios. " \
                "Consider removing disable_session_manager: true to use secure per-client sessions.")
    @session_manager = nil
  else
    @session_manager = SseSessionManager.new(self)
  end
  @puma_server = nil
  @running = false

  logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" }
end

Instance Attribute Details

#hostString (readonly)

The hostname or IP address the server will bind to.

Returns:

  • (String)

    the current value of host



44
45
46
# File 'lib/vector_mcp/transport/sse.rb', line 44

def host
  @host
end

#loggerLogger (readonly)

The logger instance, shared with the server.

Returns:

  • (Logger)

    the current value of logger



44
45
46
# File 'lib/vector_mcp/transport/sse.rb', line 44

def logger
  @logger
end

#path_prefixString (readonly)

The base URL path for MCP endpoints (e.g., “/mcp”).

Returns:

  • (String)

    the current value of path_prefix



44
45
46
# File 'lib/vector_mcp/transport/sse.rb', line 44

def path_prefix
  @path_prefix
end

#portInteger (readonly)

The port number the server will listen on.

Returns:

  • (Integer)

    the current value of port



44
45
46
# File 'lib/vector_mcp/transport/sse.rb', line 44

def port
  @port
end

#serverVectorMCP::Server (readonly)

The server instance this transport is bound to.

Returns:



44
45
46
# File 'lib/vector_mcp/transport/sse.rb', line 44

def server
  @server
end

#session_managerObject (readonly)

Returns the value of attribute session_manager.



45
46
47
# File 'lib/vector_mcp/transport/sse.rb', line 45

def session_manager
  @session_manager
end

Instance Method Details

#broadcast_notification(method, params = nil) ⇒ void

This method returns an undefined value.

Broadcasts a JSON-RPC notification to all currently connected client sessions.

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).



163
164
165
166
167
168
169
170
171
# File 'lib/vector_mcp/transport/sse.rb', line 163

def broadcast_notification(method, params = nil)
  # Broadcasting notification to clients
  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params

  @clients.each_value do |client_conn|
    StreamManager.enqueue_message(client_conn, message)
  end
end

#build_rack_app(session = nil) ⇒ self

Provides compatibility for tests that expect a ‘build_rack_app` helper. Since the transport itself is a Rack app (defines `#call`), it returns `self`.

Parameters:

  • session (VectorMCP::Session, nil) (defaults to: nil)

    An optional session to persist for testing.

Returns:

  • (self)

    The transport instance itself.



178
179
180
181
# File 'lib/vector_mcp/transport/sse.rb', line 178

def build_rack_app(session = nil)
  @session = session if session
  self
end

#call(env) ⇒ Array(Integer, Hash, Object)

Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.

Parameters:

  • env (Hash)

    The Rack environment hash.

Returns:

  • (Array(Integer, Hash, Object))

    A standard Rack response triplet: [status, headers, body].



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/vector_mcp/transport/sse.rb', line 107

def call(env)
  start_time = Time.now
  path = env["PATH_INFO"]
  http_method = env["REQUEST_METHOD"]
  logger.info "Received #{http_method} request for #{path}"

  status, headers, body = route_request(path, env)

  log_response(http_method, path, start_time, status)
  [status, headers, body]
rescue StandardError => e
  handle_call_error(http_method, path, e)
end

#cleanup_clientsObject

Cleans up all client connections (legacy mode)



196
197
198
199
200
201
202
203
204
# File 'lib/vector_mcp/transport/sse.rb', line 196

def cleanup_clients
  logger.info("Cleaning up #{@clients.size} client connection(s)")
  @clients.each_value do |client_conn|
    client_conn.close if client_conn.respond_to?(:close)
  rescue StandardError => e
    logger.warn("Error closing client connection: #{e.message}")
  end
  @clients.clear
end

#runvoid

This method returns an undefined value.

Starts the SSE transport, creating a shared session and launching the Puma server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).

Raises:

  • (StandardError)

    if there’s a fatal error during server startup.



91
92
93
94
95
96
97
98
# File 'lib/vector_mcp/transport/sse.rb', line 91

def run
  logger.info("Starting server with Puma SSE transport on #{@host}:#{@port}")
  # Only create shared session if explicitly using legacy mode (deprecated)
  create_session unless @session_manager
  start_puma_server
rescue StandardError => e
  handle_fatal_error(e)
end

#send_notification(method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification to the first available client session. If no clients are connected, returns false.

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the message was sent successfully, false otherwise.



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/vector_mcp/transport/sse.rb', line 129

def send_notification(method, params = nil)
  return false if @clients.empty?

  # Send to first available client
  first_client = @clients.values.first
  return false unless first_client

  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params

  StreamManager.enqueue_message(first_client, message)
end

#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification to a specific client session via its SSE stream.

Parameters:

  • session_id (String)

    The ID of the client session to send the notification to.

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the message was successfully enqueued, false otherwise (e.g., client not found).



148
149
150
151
152
153
154
155
156
# File 'lib/vector_mcp/transport/sse.rb', line 148

def send_notification_to_session(session_id, method, params = nil)
  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params

  client_conn = @clients[session_id]
  return false unless client_conn

  StreamManager.enqueue_message(client_conn, message)
end

#stopObject

Stops the transport and cleans up resources



184
185
186
187
188
189
190
191
192
193
# File 'lib/vector_mcp/transport/sse.rb', line 184

def stop
  @running = false
  if @session_manager
    @session_manager.cleanup_all_sessions
  else
    cleanup_clients
  end
  @puma_server&.stop
  logger.info("SSE transport stopped")
end