Class: ModelContextProtocol::Server::StreamableHttpTransport::RequestStore

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport/request_store.rb

Overview

Redis-based distributed storage for tracking active requests and their cancellation status. This store is used by StreamableHttpTransport to manage request lifecycle across multiple server instances and handle cancellation in a distributed environment.

Constant Summary collapse

REQUEST_KEY_PREFIX =
"request:active:"
CANCELLED_KEY_PREFIX =
"request:cancelled:"
SESSION_KEY_PREFIX =
"request:session:"
DEFAULT_TTL =

1 minute TTL for request entries

60

Instance Method Summary collapse

Constructor Details

#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ RequestStore

Returns a new instance of RequestStore.



14
15
16
17
18
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 14

def initialize(redis_client, server_instance, ttl: DEFAULT_TTL)
  @redis = redis_client
  @server_instance = server_instance
  @ttl = ttl
end

Instance Method Details

#active?(request_id) ⇒ Boolean

Check if a request is currently active

Parameters:

  • request_id (String)

    the unique request identifier

Returns:

  • (Boolean)

    true if the request is active, false otherwise



119
120
121
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 119

def active?(request_id)
  @redis.exists("#{REQUEST_KEY_PREFIX}#{request_id}") == 1
end

#cancelled?(request_id) ⇒ Boolean

Check if a request has been cancelled

Parameters:

  • request_id (String)

    the unique request identifier

Returns:

  • (Boolean)

    true if the request is cancelled, false otherwise



63
64
65
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 63

def cancelled?(request_id)
  @redis.exists("#{CANCELLED_KEY_PREFIX}#{request_id}") == 1
end

#cleanup_expired_requestsInteger

Clean up expired requests based on TTL This method can be called periodically to ensure cleanup

Returns:

  • (Integer)

    number of expired requests cleaned up



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 179

def cleanup_expired_requests
  active_keys = @redis.keys("#{REQUEST_KEY_PREFIX}*")
  expired_count = 0
  key_exists_without_expiration = -1
  key_does_not_exist = -2

  active_keys.each do |key|
    ttl = @redis.ttl(key)
    if ttl == key_exists_without_expiration
      @redis.expire(key, @ttl)
    elsif ttl == key_does_not_exist
      expired_count += 1
    end
  end

  expired_count
end

#cleanup_session_requests(session_id) ⇒ Array<String>

Clean up all requests associated with a session This is typically called when a session is terminated

Parameters:

  • session_id (String)

    the session identifier

Returns:

  • (Array<String>)

    list of cleaned up request IDs



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 128

def cleanup_session_requests(session_id)
  pattern = "#{SESSION_KEY_PREFIX}#{session_id}:*"
  request_keys = @redis.keys(pattern)
  return [] if request_keys.empty?

  # Extract request IDs from the keys
  request_ids = request_keys.map do |key|
    key.sub("#{SESSION_KEY_PREFIX}#{session_id}:", "")
  end

  # Delete all related keys
  all_keys = []
  request_ids.each do |request_id|
    all_keys << "#{REQUEST_KEY_PREFIX}#{request_id}"
    all_keys << "#{CANCELLED_KEY_PREFIX}#{request_id}"
  end
  all_keys.concat(request_keys)

  @redis.del(*all_keys) unless all_keys.empty?
  request_ids
end

#get_all_active_requestsArray<String>

Get all active request IDs across all sessions

Returns:

  • (Array<String>)

    list of all active request IDs



166
167
168
169
170
171
172
173
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 166

def get_all_active_requests
  pattern = "#{REQUEST_KEY_PREFIX}*"
  request_keys = @redis.keys(pattern)

  request_keys.map do |key|
    key.sub(REQUEST_KEY_PREFIX, "")
  end
end

#get_cancellation_info(request_id) ⇒ Hash?

Get cancellation information for a request

Parameters:

  • request_id (String)

    the unique request identifier

Returns:

  • (Hash, nil)

    cancellation data or nil if not cancelled



71
72
73
74
75
76
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 71

def get_cancellation_info(request_id)
  data = @redis.get("#{CANCELLED_KEY_PREFIX}#{request_id}")
  data ? JSON.parse(data) : nil
rescue JSON::ParserError
  nil
end

#get_request(request_id) ⇒ Hash?

Get information about a specific request

Parameters:

  • request_id (String)

    the unique request identifier

Returns:

  • (Hash, nil)

    request information or nil if not found



108
109
110
111
112
113
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 108

def get_request(request_id)
  data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}")
  data ? JSON.parse(data) : nil
rescue JSON::ParserError
  nil
end

#get_session_requests(session_id) ⇒ Array<String>

Get all active request IDs for a specific session

Parameters:

  • session_id (String)

    the session identifier

Returns:

  • (Array<String>)

    list of active request IDs for the session



154
155
156
157
158
159
160
161
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 154

def get_session_requests(session_id)
  pattern = "#{SESSION_KEY_PREFIX}#{session_id}:*"
  request_keys = @redis.keys(pattern)

  request_keys.map do |key|
    key.sub("#{SESSION_KEY_PREFIX}#{session_id}:", "")
  end
end

#mark_cancelled(request_id, reason = nil) ⇒ Boolean

Mark a request as cancelled

Parameters:

  • request_id (String)

    the unique request identifier

  • reason (String) (defaults to: nil)

    optional reason for cancellation

Returns:

  • (Boolean)

    true if cancellation was recorded



48
49
50
51
52
53
54
55
56
57
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 48

def mark_cancelled(request_id, reason = nil)
  cancellation_data = {
    cancelled_at: Time.now.to_f,
    reason: reason
  }

  result = @redis.set("#{CANCELLED_KEY_PREFIX}#{request_id}",
    cancellation_data.to_json, ex: @ttl)
  result == "OK"
end

#refresh_request_ttl(request_id) ⇒ Boolean

Refresh the TTL for an active request

Parameters:

  • request_id (String)

    the unique request identifier

Returns:

  • (Boolean)

    true if TTL was refreshed, false if request doesn’t exist



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 201

def refresh_request_ttl(request_id)
  request_data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}")
  return false unless request_data

  @redis.multi do |multi|
    multi.expire("#{REQUEST_KEY_PREFIX}#{request_id}", @ttl)
    multi.expire("#{CANCELLED_KEY_PREFIX}#{request_id}", @ttl)

    begin
      data = JSON.parse(request_data)
      session_id = data["session_id"]
      if session_id
        multi.expire("#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}", @ttl)
      end
    rescue JSON::ParserError
      nil
    end
  end

  true
end

#register_request(request_id, session_id = nil) ⇒ void

This method returns an undefined value.

Register a new request with its associated session

Parameters:

  • request_id (String)

    the unique request identifier

  • session_id (String) (defaults to: nil)

    the session identifier (can be nil for sessionless requests)



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 25

def register_request(request_id, session_id = nil)
  request_data = {
    session_id: session_id,
    server_instance: @server_instance,
    started_at: Time.now.to_f
  }

  @redis.multi do |multi|
    multi.set("#{REQUEST_KEY_PREFIX}#{request_id}",
      request_data.to_json, ex: @ttl)

    if session_id
      multi.set("#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}",
        true, ex: @ttl)
    end
  end
end

#unregister_request(request_id) ⇒ void

This method returns an undefined value.

Unregister a request (typically called when request completes)

Parameters:

  • request_id (String)

    the unique request identifier



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/model_context_protocol/server/streamable_http_transport/request_store.rb', line 82

def unregister_request(request_id)
  request_data = @redis.get("#{REQUEST_KEY_PREFIX}#{request_id}")

  keys_to_delete = ["#{REQUEST_KEY_PREFIX}#{request_id}",
    "#{CANCELLED_KEY_PREFIX}#{request_id}"]

  if request_data
    begin
      data = JSON.parse(request_data)
      session_id = data["session_id"]

      if session_id
        keys_to_delete << "#{SESSION_KEY_PREFIX}#{session_id}:#{request_id}"
      end
    rescue JSON::ParserError
      nil
    end
  end

  @redis.del(*keys_to_delete) unless keys_to_delete.empty?
end