Class: ModelContextProtocol::Server::StreamableHttpTransport::StreamRegistry
- Inherits:
-
Object
- Object
- ModelContextProtocol::Server::StreamableHttpTransport::StreamRegistry
- Defined in:
- lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb
Constant Summary collapse
- STREAM_KEY_PREFIX =
"stream:active:"- HEARTBEAT_KEY_PREFIX =
"stream:heartbeat:"- DEFAULT_TTL =
1 minute TTL for stream entries
60
Instance Method Summary collapse
- #cleanup_expired_streams ⇒ Object
- #get_all_local_streams ⇒ Object
- #get_local_stream(session_id) ⇒ Object
- #get_stale_streams(max_age_seconds = 90) ⇒ Object
- #get_stream_server(session_id) ⇒ Object
- #has_any_local_streams? ⇒ Boolean
- #has_local_stream?(session_id) ⇒ Boolean
-
#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ StreamRegistry
constructor
A new instance of StreamRegistry.
- #refresh_heartbeat(session_id) ⇒ Object
- #register_stream(session_id, stream) ⇒ Object
- #stream_active?(session_id) ⇒ Boolean
- #unregister_stream(session_id) ⇒ Object
Constructor Details
#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ StreamRegistry
Returns a new instance of StreamRegistry.
12 13 14 15 16 17 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 12 def initialize(redis_client, server_instance, ttl: DEFAULT_TTL) @redis = redis_client @server_instance = server_instance @ttl = ttl @local_streams = {} # Keep local reference for direct stream access end |
Instance Method Details
#cleanup_expired_streams ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 69 def cleanup_expired_streams # Get all local stream session IDs local_session_ids = @local_streams.keys # Check which ones are still active in Redis pipeline_results = @redis.pipelined do |pipeline| local_session_ids.each do |session_id| pipeline.exists("#{STREAM_KEY_PREFIX}#{session_id}") end end # Remove expired streams from local storage expired_sessions = [] local_session_ids.each_with_index do |session_id, index| if pipeline_results[index] == 0 # Stream expired in Redis @local_streams.delete(session_id) expired_sessions << session_id end end expired_sessions end |
#get_all_local_streams ⇒ Object
61 62 63 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 61 def get_all_local_streams @local_streams.dup end |
#get_local_stream(session_id) ⇒ Object
38 39 40 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 38 def get_local_stream(session_id) @local_streams[session_id] end |
#get_stale_streams(max_age_seconds = 90) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 92 def get_stale_streams(max_age_seconds = 90) current_time = Time.now.to_f stale_streams = [] # Get all heartbeat keys heartbeat_keys = @redis.keys("#{HEARTBEAT_KEY_PREFIX}*") return stale_streams if heartbeat_keys.empty? # Get all heartbeat timestamps heartbeat_values = @redis.mget(heartbeat_keys) heartbeat_keys.each_with_index do |key, index| next unless heartbeat_values[index] session_id = key.sub(HEARTBEAT_KEY_PREFIX, "") last_heartbeat = heartbeat_values[index].to_f if current_time - last_heartbeat > max_age_seconds stale_streams << session_id end end stale_streams end |
#get_stream_server(session_id) ⇒ Object
46 47 48 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 46 def get_stream_server(session_id) @redis.get("#{STREAM_KEY_PREFIX}#{session_id}") end |
#has_any_local_streams? ⇒ Boolean
65 66 67 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 65 def has_any_local_streams? !@local_streams.empty? end |
#has_local_stream?(session_id) ⇒ Boolean
42 43 44 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 42 def has_local_stream?(session_id) @local_streams.key?(session_id) end |
#refresh_heartbeat(session_id) ⇒ Object
54 55 56 57 58 59 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 54 def refresh_heartbeat(session_id) @redis.multi do |multi| multi.set("#{HEARTBEAT_KEY_PREFIX}#{session_id}", Time.now.to_f, ex: @ttl) multi.expire("#{STREAM_KEY_PREFIX}#{session_id}", @ttl) end end |
#register_stream(session_id, stream) ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 19 def register_stream(session_id, stream) @local_streams[session_id] = stream # Store stream registration in Redis with TTL @redis.multi do |multi| multi.set("#{STREAM_KEY_PREFIX}#{session_id}", @server_instance, ex: @ttl) multi.set("#{HEARTBEAT_KEY_PREFIX}#{session_id}", Time.now.to_f, ex: @ttl) end end |
#stream_active?(session_id) ⇒ Boolean
50 51 52 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 50 def stream_active?(session_id) @redis.exists("#{STREAM_KEY_PREFIX}#{session_id}") == 1 end |
#unregister_stream(session_id) ⇒ Object
29 30 31 32 33 34 35 36 |
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 29 def unregister_stream(session_id) @local_streams.delete(session_id) @redis.multi do |multi| multi.del("#{STREAM_KEY_PREFIX}#{session_id}") multi.del("#{HEARTBEAT_KEY_PREFIX}#{session_id}") end end |