Class: ModelContextProtocol::Server::StreamableHttpTransport::RequestStore
- Inherits:
-
Object
- Object
- ModelContextProtocol::Server::StreamableHttpTransport::RequestStore
- Defined in:
- lib/model_context_protocol/server/streamable_http_transport/request_store.rb
Overview
Redis-based distributed storage for tracking active requests and their cancellation status. This store is used by StreamableHttpTransport to manage request lifecycle across multiple server instances and handle cancellation in a distributed environment.
Constant Summary collapse
- REQUEST_KEY_PREFIX =
"request:active:"- CANCELLED_KEY_PREFIX =
"request:cancelled:"- SESSION_KEY_PREFIX =
"request:session:"- DEFAULT_TTL =
1 minute TTL for request entries
60
Instance Method Summary collapse
-
#active?(request_id) ⇒ Boolean
Check if a request is currently active.
-
#cancelled?(request_id) ⇒ Boolean
Check if a request has been cancelled.
-
#cleanup_expired_requests ⇒ Integer
Clean up expired requests based on TTL This method can be called periodically to ensure cleanup.
-
#cleanup_session_requests(session_id) ⇒ Array<String>
Clean up all requests associated with a session This is typically called when a session is terminated.
-
#get_all_active_requests ⇒ Array<String>
Get all active request IDs across all sessions.
-
#get_cancellation_info(request_id) ⇒ Hash?
Get cancellation information for a request.
-
#get_request(request_id) ⇒ Hash?
Get information about a specific request.
-
#get_session_requests(session_id) ⇒ Array<String>
Get all active request IDs for a specific session.
-
#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ RequestStore
constructor
A new instance of RequestStore.
-
#mark_cancelled(request_id, reason = nil) ⇒ Boolean
Mark a request as cancelled.
-
#refresh_request_ttl(request_id) ⇒ Boolean
Refresh the TTL for an active request.
-
#register_request(request_id, session_id = nil) ⇒ void
Register a new request with its associated session.
-
#unregister_request(request_id) ⇒ void
Unregister a request (typically called when request completes).
Constructor Details
#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ RequestStore
Returns a new instance of RequestStore.
14 15 16 17 18 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 14 def initialize(redis_client, server_instance, ttl: DEFAULT_TTL) @redis = redis_client @server_instance = server_instance @ttl = ttl end |
Instance Method Details
#active?(request_id) ⇒ Boolean
Check if a request is currently active
119 120 121 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 119 def active?(request_id) @redis.exists("#{REQUEST_KEY_PREFIX}#{request_id}") == 1 end |
#cancelled?(request_id) ⇒ Boolean
Check if a request has been cancelled
63 64 65 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 63 def cancelled?(request_id) @redis.exists("#{CANCELLED_KEY_PREFIX}#{request_id}") == 1 end |
#cleanup_expired_requests ⇒ Integer
Clean up expired requests based on TTL This method can be called periodically to ensure cleanup
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 179 def cleanup_expired_requests active_keys = @redis.keys("#{REQUEST_KEY_PREFIX}*") expired_count = 0 key_exists_without_expiration = -1 key_does_not_exist = -2 active_keys.each do |key| ttl = @redis.ttl(key) if ttl == key_exists_without_expiration @redis.expire(key, @ttl) elsif ttl == key_does_not_exist expired_count += 1 end end expired_count end |
#cleanup_session_requests(session_id) ⇒ Array<String>
Clean up all requests associated with a session This is typically called when a session is terminated
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 128 def cleanup_session_requests(session_id) pattern = "#{SESSION_KEY_PREFIX}#{session_id}:*" request_keys = @redis.keys(pattern) return [] if request_keys.empty? # Extract request IDs from the keys request_ids = request_keys.map do |key| key.sub("#{SESSION_KEY_PREFIX}#{session_id}:", "") end # Delete all related keys all_keys = [] request_ids.each do |request_id| all_keys << "#{REQUEST_KEY_PREFIX}#{request_id}" all_keys << "#{CANCELLED_KEY_PREFIX}#{request_id}" end all_keys.concat(request_keys) @redis.del(*all_keys) unless all_keys.empty? request_ids end |
#get_all_active_requests ⇒ Array<String>
Get all active request IDs across all sessions
166 167 168 169 170 171 172 173 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 166 def get_all_active_requests pattern = "#{REQUEST_KEY_PREFIX}*" request_keys = @redis.keys(pattern) request_keys.map do |key| key.sub(REQUEST_KEY_PREFIX, "") end end |
#get_cancellation_info(request_id) ⇒ Hash?
Get cancellation information for a request
71 72 73 74 75 76 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 71 def get_cancellation_info(request_id) data = @redis.get("#{CANCELLED_KEY_PREFIX}#{request_id}") data ? JSON.parse(data) : nil rescue JSON::ParserError nil end |
#get_request(request_id) ⇒ Hash?
Get information about a specific request
108 109 110 111 112 113 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 108 def get_request(request_id) data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}") data ? JSON.parse(data) : nil rescue JSON::ParserError nil end |
#get_session_requests(session_id) ⇒ Array<String>
Get all active request IDs for a specific session
154 155 156 157 158 159 160 161 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 154 def get_session_requests(session_id) pattern = "#{SESSION_KEY_PREFIX}#{session_id}:*" request_keys = @redis.keys(pattern) request_keys.map do |key| key.sub("#{SESSION_KEY_PREFIX}#{session_id}:", "") end end |
#mark_cancelled(request_id, reason = nil) ⇒ Boolean
Mark a request as cancelled
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 48 def mark_cancelled(request_id, reason = nil) cancellation_data = { cancelled_at: Time.now.to_f, reason: reason } result = @redis.set("#{CANCELLED_KEY_PREFIX}#{request_id}", cancellation_data.to_json, ex: @ttl) result == "OK" end |
#refresh_request_ttl(request_id) ⇒ Boolean
Refresh the TTL for an active request
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 201 def refresh_request_ttl(request_id) request_data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}") return false unless request_data @redis.multi do |multi| multi.expire("#{REQUEST_KEY_PREFIX}#{request_id}", @ttl) multi.expire("#{CANCELLED_KEY_PREFIX}#{request_id}", @ttl) begin data = JSON.parse(request_data) session_id = data["session_id"] if session_id multi.expire("#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}", @ttl) end rescue JSON::ParserError nil end end true end |
#register_request(request_id, session_id = nil) ⇒ void
This method returns an undefined value.
Register a new request with its associated session
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 25 def register_request(request_id, session_id = nil) request_data = { session_id: session_id, server_instance: @server_instance, started_at: Time.now.to_f } @redis.multi do |multi| multi.set("#{REQUEST_KEY_PREFIX}#{request_id}", request_data.to_json, ex: @ttl) if session_id multi.set("#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}", true, ex: @ttl) end end end |
#unregister_request(request_id) ⇒ void
This method returns an undefined value.
Unregister a request (typically called when request completes)
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 82 def unregister_request(request_id) request_data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}") keys_to_delete = ["#{REQUEST_KEY_PREFIX}#{request_id}", "#{CANCELLED_KEY_PREFIX}#{request_id}"] if request_data begin data = JSON.parse(request_data) session_id = data["session_id"] if session_id keys_to_delete << "#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}" end rescue JSON::ParserError nil end end @redis.del(*keys_to_delete) unless keys_to_delete.empty? end |