Class: ModelContextProtocol::Server::StreamableHttpTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport.rb,
lib/model_context_protocol/server/streamable_http_transport/event_counter.rb,
lib/model_context_protocol/server/streamable_http_transport/request_store.rb,
lib/model_context_protocol/server/streamable_http_transport/session_store.rb,
lib/model_context_protocol/server/streamable_http_transport/message_poller.rb,
lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb,
lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb,
lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb

Defined Under Namespace

Classes: EventCounter, MessagePoller, NotificationQueue, RequestStore, SessionMessageQueue, SessionStore, StreamRegistry

Constant Summary collapse

Response =
Data.define(:id, :result) do
  def serialized
    {jsonrpc: "2.0", id:, result:}
  end
end
ErrorResponse =
Data.define(:id, :error) do
  def serialized
    {jsonrpc: "2.0", id:, error:}
  end
end

Instance Method Summary collapse

Constructor Details

#initialize(router:, configuration:) ⇒ StreamableHttpTransport

Returns a new instance of StreamableHttpTransport.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 18

def initialize(router:, configuration:)
  @router = router
  @configuration = configuration

  transport_options = @configuration.transport_options
  @redis_pool = ModelContextProtocol::Server::RedisConfig.pool
  @require_sessions = transport_options.fetch(:require_sessions, false)
  @default_protocol_version = transport_options.fetch(:default_protocol_version, "2025-03-26")
  @session_protocol_versions = {}
  @validate_origin = transport_options.fetch(:validate_origin, true)
  @allowed_origins = transport_options.fetch(:allowed_origins, ["http://localhost", "https://localhost", "http://127.0.0.1", "https://127.0.0.1"])
  @redis = ModelContextProtocol::Server::RedisClientProxy.new(@redis_pool)

  @session_store = SessionStore.new(
    @redis,
    ttl: transport_options[:session_ttl] || 3600
  )

  @server_instance = "#{Socket.gethostname}-#{Process.pid}-#{SecureRandom.hex(4)}"
  @stream_registry = StreamRegistry.new(@redis, @server_instance)
  @notification_queue = NotificationQueue.new(@redis, @server_instance)
  @event_counter = EventCounter.new(@redis, @server_instance)
  @request_store = RequestStore.new(@redis, @server_instance)
  @stream_monitor_thread = nil
  @message_poller = MessagePoller.new(@redis, @stream_registry, @configuration.logger) do |stream, message|
    send_to_stream(stream, message)
  end

  start_message_poller
  start_stream_monitor
end

Instance Method Details

#handleObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 75

def handle
  @configuration.logger.connect_transport(self)

  env = @configuration.transport_options[:env]

  unless env
    raise ArgumentError, "StreamableHTTP transport requires Rack env hash in transport_options"
  end

  case env["REQUEST_METHOD"]
  when "POST"
    handle_post_request(env)
  when "GET"
    handle_sse_request(env)
  when "DELETE"
    handle_delete_request(env)
  else
    error_response = ErrorResponse[id: nil, error: {code: -32601, message: "Method not allowed"}]
    {json: error_response.serialized, status: 405}
  end
end

#send_notification(method, params) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 97

def send_notification(method, params)
  notification = {
    jsonrpc: "2.0",
    method: method,
    params: params
  }

  if @stream_registry.has_any_local_streams?
    deliver_to_active_streams(notification)
  else
    @notification_queue.push(notification)
  end
end

#shutdownObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/model_context_protocol/server/streamable_http_transport.rb', line 50

def shutdown
  @configuration.logger.info("Shutting down StreamableHttpTransport")

  # Stop the message poller
  @message_poller&.stop

  # Stop the stream monitor thread
  if @stream_monitor_thread&.alive?
    @stream_monitor_thread.kill
    @stream_monitor_thread.join(timeout: 5)
  end

  # Unregister all local streams
  @stream_registry.get_all_local_streams.each do |session_id, stream|
    @stream_registry.unregister_stream(session_id)
    @session_store.mark_stream_inactive(session_id)
  rescue => e
    @configuration.logger.error("Error during stream cleanup", session_id: session_id, error: e.message)
  end

  @redis_pool.checkin(@redis) if @redis_pool && @redis

  @configuration.logger.info("StreamableHttpTransport shutdown complete")
end