Class: ModelContextProtocol::Server::StreamableHttpTransport
- Inherits:
-
Object
- Object
- ModelContextProtocol::Server::StreamableHttpTransport
show all
- Defined in:
- lib/model_context_protocol/server/streamable_http_transport.rb,
lib/model_context_protocol/server/streamable_http_transport/event_counter.rb,
lib/model_context_protocol/server/streamable_http_transport/request_store.rb,
lib/model_context_protocol/server/streamable_http_transport/session_store.rb,
lib/model_context_protocol/server/streamable_http_transport/message_poller.rb,
lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb,
lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb,
lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb
Defined Under Namespace
Classes: EventCounter, MessagePoller, NotificationQueue, RequestStore, SessionMessageQueue, SessionStore, StreamRegistry
Constant Summary
collapse
- Response =
Data.define(:id, :result) do
def serialized
{jsonrpc: "2.0", id:, result:}
end
end
- ErrorResponse =
Data.define(:id, :error) do
def serialized
{jsonrpc: "2.0", id:, error:}
end
end
Instance Method Summary
collapse
Constructor Details
Returns a new instance of StreamableHttpTransport.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 18
def initialize(router:, configuration:)
@router = router
@configuration = configuration
transport_options = @configuration.transport_options
@redis_pool = ModelContextProtocol::Server::RedisConfig.pool
@require_sessions = transport_options.fetch(:require_sessions, false)
@default_protocol_version = transport_options.fetch(:default_protocol_version, "2025-03-26")
@session_protocol_versions = {}
@validate_origin = transport_options.fetch(:validate_origin, true)
@allowed_origins = transport_options.fetch(:allowed_origins, ["http://localhost", "https://localhost", "http://127.0.0.1", "https://127.0.0.1"])
@redis = ModelContextProtocol::Server::RedisClientProxy.new(@redis_pool)
@session_store = SessionStore.new(
@redis,
ttl: transport_options[:session_ttl] || 3600
)
@server_instance = "#{Socket.gethostname}-#{Process.pid}-#{SecureRandom.hex(4)}"
@stream_registry = StreamRegistry.new(@redis, @server_instance)
@notification_queue = NotificationQueue.new(@redis, @server_instance)
@event_counter = EventCounter.new(@redis, @server_instance)
@request_store = RequestStore.new(@redis, @server_instance)
@stream_monitor_thread = nil
@message_poller = MessagePoller.new(@redis, @stream_registry, @configuration.logger) do |stream, message|
send_to_stream(stream, message)
end
start_message_poller
start_stream_monitor
end
|
Instance Method Details
#handle ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 75
def handle
@configuration.logger.connect_transport(self)
env = @configuration.transport_options[:env]
unless env
raise ArgumentError, "StreamableHTTP transport requires Rack env hash in transport_options"
end
case env["REQUEST_METHOD"]
when "POST"
handle_post_request(env)
when "GET"
handle_sse_request(env)
when "DELETE"
handle_delete_request(env)
else
error_response = ErrorResponse[id: nil, error: {code: -32601, message: "Method not allowed"}]
{json: error_response.serialized, status: 405}
end
end
|
#send_notification(method, params) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 97
def send_notification(method, params)
notification = {
jsonrpc: "2.0",
method: method,
params: params
}
if @stream_registry.has_any_local_streams?
deliver_to_active_streams(notification)
else
@notification_queue.push(notification)
end
end
|
#shutdown ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 50
def shutdown
@configuration.logger.info("Shutting down StreamableHttpTransport")
@message_poller&.stop
if @stream_monitor_thread&.alive?
@stream_monitor_thread.kill
@stream_monitor_thread.join(timeout: 5)
end
@stream_registry.get_all_local_streams.each do |session_id, stream|
@stream_registry.unregister_stream(session_id)
@session_store.mark_stream_inactive(session_id)
rescue => e
@configuration.logger.error("Error during stream cleanup", session_id: session_id, error: e.message)
end
@redis_pool.checkin(@redis) if @redis_pool && @redis
@configuration.logger.info("StreamableHttpTransport shutdown complete")
end
|