Class: VectorMCP::Transport::HttpStream

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/http_stream.rb,
lib/vector_mcp/transport/http_stream/event_store.rb,
lib/vector_mcp/transport/http_stream/stream_handler.rb,
lib/vector_mcp/transport/http_stream/session_manager.rb

Overview

Implements the Model Context Protocol transport over HTTP with streaming support according to the MCP specification for Streamable HTTP transport.

This transport supports:

  • Client-to-server communication via HTTP POST

  • Optional server-to-client streaming via Server-Sent Events (SSE)

  • Session management with Mcp-Session-Id headers

  • Resumable connections with event IDs and Last-Event-ID support

  • Bidirectional communication patterns

Endpoints:

  • POST /mcp - Client sends JSON-RPC requests

  • GET /mcp - Optional SSE streaming for server-initiated messages

  • DELETE /mcp - Session termination

rubocop:disable Metrics/ClassLength

Examples:

Basic Usage

server = VectorMCP::Server.new("http-stream-server")
transport = VectorMCP::Transport::HttpStream.new(server, port: 8080)
server.run(transport: transport)

Defined Under Namespace

Classes: EventStore, SessionManager, StreamHandler

Constant Summary collapse

DEFAULT_HOST =

Default configuration values

"localhost"
DEFAULT_PORT =
8000
DEFAULT_PATH_PREFIX =
"/mcp"
DEFAULT_SESSION_TIMEOUT =

5 minutes

300
DEFAULT_EVENT_RETENTION =

Keep last 100 events for resumability

100
DEFAULT_REQUEST_TIMEOUT =

Default timeout for server-initiated requests

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ HttpStream

Initializes a new HTTP Stream transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages

  • options (Hash) (defaults to: {})

    Configuration options for the transport

Options Hash (options):

  • :host (String) — default: "localhost"

    The hostname or IP to bind to

  • :port (Integer) — default: 8000

    The port to listen on

  • :path_prefix (String) — default: "/mcp"

    The base path for HTTP endpoints

  • :session_timeout (Integer) — default: 300

    Session timeout in seconds

  • :event_retention (Integer) — default: 100

    Number of events to retain for resumability

  • :allowed_origins (Array<String>) — default: ["*"]

    List of allowed origins for CORS. Use [“*”] to allow all origins.



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/vector_mcp/transport/http_stream.rb', line 66

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  initialize_configuration(options)
  initialize_components
  initialize_request_tracking
  initialize_object_pools
  initialize_server_state

  logger.info { "HttpStream transport initialized: #{@host}:#{@port}#{@path_prefix}" }
end

Instance Attribute Details

#event_storeHttpStream::EventStore (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to event store for internal components.



225
226
227
# File 'lib/vector_mcp/transport/http_stream.rb', line 225

def event_store
  @event_store
end

#hostString (readonly)

The hostname or IP address the server will bind to

Returns:

  • (String)

    the current value of host



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def host
  @host
end

#loggerLogger (readonly)

The logger instance, shared with the server

Returns:

  • (Logger)

    the current value of logger



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def logger
  @logger
end

#path_prefixString (readonly)

The base URL path for MCP endpoints

Returns:

  • (String)

    the current value of path_prefix



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def path_prefix
  @path_prefix
end

#portInteger (readonly)

The port number the server will listen on

Returns:

  • (Integer)

    the current value of port



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def port
  @port
end

#serverVectorMCP::Server (readonly)

The server instance this transport is bound to

Returns:



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def server
  @server
end

#session_managerHttpStream::SessionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to session manager for internal components.



219
220
221
# File 'lib/vector_mcp/transport/http_stream.rb', line 219

def session_manager
  @session_manager
end

#stream_handlerHttpStream::StreamHandler (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to stream handler for internal components.



231
232
233
# File 'lib/vector_mcp/transport/http_stream.rb', line 231

def stream_handler
  @stream_handler
end

Instance Method Details

#broadcast_notification(method, params = nil) ⇒ Integer

Broadcasts a notification to all active sessions.

Parameters:

  • method (String)

    The notification method name

  • params (Hash, Array, nil) (defaults to: nil)

    The notification parameters

Returns:

  • (Integer)

    Number of sessions the notification was sent to



141
142
143
144
# File 'lib/vector_mcp/transport/http_stream.rb', line 141

def broadcast_notification(method, params = nil)
  message = build_notification(method, params)
  @session_manager.broadcast_message(message)
end

#call(env) ⇒ Array(Integer, Hash, Object)

Handles incoming HTTP requests (Rack interface). Routes requests to appropriate handlers based on path and method.

Parameters:

  • env (Hash)

    The Rack environment hash

Returns:

  • (Array(Integer, Hash, Object))

    Standard Rack response triplet



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/vector_mcp/transport/http_stream.rb', line 94

def call(env)
  start_time = Time.now
  path = env["PATH_INFO"]
  method = env["REQUEST_METHOD"]

  # Processing HTTP request

  response = route_request(path, method, env)
  log_request_completion(method, path, start_time, response[0])
  response
rescue StandardError => e
  handle_request_error(method, path, e)
end

#runvoid

This method returns an undefined value.

Starts the HTTP Stream transport. This method will block until the server is stopped.

Raises:

  • (StandardError)

    if there’s a fatal error during server startup



83
84
85
86
87
# File 'lib/vector_mcp/transport/http_stream.rb', line 83

def run
  start_puma_server
rescue StandardError => e
  handle_fatal_error(e)
end

#send_notification(method, params = nil) ⇒ Boolean

Sends a notification to the first available session.

Parameters:

  • method (String)

    The notification method name

  • params (Hash, Array, nil) (defaults to: nil)

    The notification parameters

Returns:

  • (Boolean)

    True if notification was sent successfully



113
114
115
116
117
118
119
120
# File 'lib/vector_mcp/transport/http_stream.rb', line 113

def send_notification(method, params = nil)
  # Find the first available session
  first_session = find_first_session
  return false unless first_session

  message = build_notification(method, params)
  @stream_handler.send_message_to_session(first_session, message)
end

#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean

Sends a notification to a specific session.

Parameters:

  • session_id (String)

    The target session ID

  • method (String)

    The notification method name

  • params (Hash, Array, nil) (defaults to: nil)

    The notification parameters

Returns:

  • (Boolean)

    True if notification was sent successfully



128
129
130
131
132
133
134
# File 'lib/vector_mcp/transport/http_stream.rb', line 128

def send_notification_to_session(session_id, method, params = nil)
  session = @session_manager.get_session(session_id)
  return false unless session

  message = build_notification(method, params)
  @stream_handler.send_message_to_session(session, message)
end

#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object

Sends a server-initiated JSON-RPC request compatible with Session expectations. This method will block until a response is received or the timeout is reached. For HTTP transport, this requires finding an appropriate session with streaming connection.

Parameters:

  • method (String)

    The request method name

  • params (Hash, Array, nil) (defaults to: nil)

    The request parameters

  • timeout (Numeric) (defaults to: DEFAULT_REQUEST_TIMEOUT)

    How long to wait for a response, in seconds

Returns:

  • (Object)

    The result part of the client’s response

Raises:



156
157
158
159
160
161
162
163
164
165
# File 'lib/vector_mcp/transport/http_stream.rb', line 156

def send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT)
  raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty?

  # Find the first session with streaming connection
  # In HTTP transport, we need an active streaming connection to send server-initiated requests
  streaming_session = find_streaming_session
  raise ArgumentError, "No streaming session available for server-initiated requests" unless streaming_session

  send_request_to_session(streaming_session.id, method, params, timeout: timeout)
end

#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object

Sends a server-initiated JSON-RPC request to a specific session and waits for a response. This method will block until a response is received or the timeout is reached.

Parameters:

  • session_id (String)

    The target session ID

  • method (String)

    The request method name

  • params (Hash, Array, nil) (defaults to: nil)

    The request parameters

  • timeout (Numeric) (defaults to: DEFAULT_REQUEST_TIMEOUT)

    How long to wait for a response, in seconds

Returns:

  • (Object)

    The result part of the client’s response

Raises:



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/vector_mcp/transport/http_stream.rb', line 177

def send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT)
  raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty?
  raise ArgumentError, "Session ID cannot be blank" if session_id.to_s.strip.empty?

  session = @session_manager.get_session(session_id)
  raise ArgumentError, "Session not found: #{session_id}" unless session

  raise ArgumentError, "Session must have streaming connection for server-initiated requests" unless session.streaming?

  request_id = generate_request_id
  request_payload = { jsonrpc: "2.0", id: request_id, method: method }
  request_payload[:params] = params if params

  setup_request_tracking(request_id)
  # Sending request to session

  # Send request via existing streaming connection
  unless @stream_handler.send_message_to_session(session, request_payload)
    cleanup_request_tracking(request_id)
    raise VectorMCP::SamplingError, "Failed to send request to session #{session_id}"
  end

  response = wait_for_response(request_id, method, timeout)
  process_response(response, request_id, method)
end

#stopvoid

This method returns an undefined value.

Stops the transport and cleans up resources.



206
207
208
209
210
211
212
213
# File 'lib/vector_mcp/transport/http_stream.rb', line 206

def stop
  logger.info { "Stopping HttpStream transport" }
  @running = false
  cleanup_all_pending_requests
  @session_manager.cleanup_all_sessions
  @puma_server&.stop
  logger.info { "HttpStream transport stopped" }
end