Class: ModelContextProtocol::Server::StreamableHttpTransport::StreamRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb

Constant Summary collapse

STREAM_KEY_PREFIX =
"stream:active:"
HEARTBEAT_KEY_PREFIX =
"stream:heartbeat:"
DEFAULT_TTL =

1 minute TTL for stream entries

60

Instance Method Summary collapse

Constructor Details

#initialize(redis_client, server_instance, ttl: DEFAULT_TTL) ⇒ StreamRegistry

Returns a new instance of StreamRegistry.



12
13
14
15
16
17
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 12

def initialize(redis_client, server_instance, ttl: DEFAULT_TTL)
  @redis = redis_client
  @server_instance = server_instance
  @ttl = ttl
  @local_streams = {} # Keep local reference for direct stream access
end

Instance Method Details

#cleanup_expired_streamsObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 69

def cleanup_expired_streams
  # Get all local stream session IDs
  local_session_ids = @local_streams.keys

  # Check which ones are still active in Redis
  pipeline_results = @redis.pipelined do |pipeline|
    local_session_ids.each do |session_id|
      pipeline.exists("#{STREAM_KEY_PREFIX}#{session_id}")
    end
  end

  # Remove expired streams from local storage
  expired_sessions = []
  local_session_ids.each_with_index do |session_id, index|
    if pipeline_results[index] == 0 # Stream expired in Redis
      @local_streams.delete(session_id)
      expired_sessions << session_id
    end
  end

  expired_sessions
end

#get_all_local_streamsObject



61
62
63
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 61

def get_all_local_streams
  @local_streams.dup
end

#get_local_stream(session_id) ⇒ Object



38
39
40
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 38

def get_local_stream(session_id)
  @local_streams[session_id]
end

#get_stale_streams(max_age_seconds = 90) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 92

def get_stale_streams(max_age_seconds = 90)
  current_time = Time.now.to_f
  stale_streams = []

  # Get all heartbeat keys
  heartbeat_keys = @redis.keys("#{HEARTBEAT_KEY_PREFIX}*")

  return stale_streams if heartbeat_keys.empty?

  # Get all heartbeat timestamps
  heartbeat_values = @redis.mget(heartbeat_keys)

  heartbeat_keys.each_with_index do |key, index|
    next unless heartbeat_values[index]

    session_id = key.sub(HEARTBEAT_KEY_PREFIX, "")
    last_heartbeat = heartbeat_values[index].to_f

    if current_time - last_heartbeat > max_age_seconds
      stale_streams << session_id
    end
  end

  stale_streams
end

#get_stream_server(session_id) ⇒ Object



46
47
48
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 46

def get_stream_server(session_id)
  @redis.get("#{STREAM_KEY_PREFIX}#{session_id}")
end

#has_any_local_streams?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 65

def has_any_local_streams?
  !@local_streams.empty?
end

#has_local_stream?(session_id) ⇒ Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 42

def has_local_stream?(session_id)
  @local_streams.key?(session_id)
end

#refresh_heartbeat(session_id) ⇒ Object



54
55
56
57
58
59
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 54

def refresh_heartbeat(session_id)
  @redis.multi do |multi|
    multi.set("#{HEARTBEAT_KEY_PREFIX}#{session_id}", Time.now.to_f, ex: @ttl)
    multi.expire("#{STREAM_KEY_PREFIX}#{session_id}", @ttl)
  end
end

#register_stream(session_id, stream) ⇒ Object



19
20
21
22
23
24
25
26
27
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 19

def register_stream(session_id, stream)
  @local_streams[session_id] = stream

  # Store stream registration in Redis with TTL
  @redis.multi do |multi|
    multi.set("#{STREAM_KEY_PREFIX}#{session_id}", @server_instance, ex: @ttl)
    multi.set("#{HEARTBEAT_KEY_PREFIX}#{session_id}", Time.now.to_f, ex: @ttl)
  end
end

#stream_active?(session_id) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 50

def stream_active?(session_id)
  @redis.exists("#{STREAM_KEY_PREFIX}#{session_id}") == 1
end

#unregister_stream(session_id) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/model_context_protocol/server/streamable_http_transport/stream_registry.rb', line 29

def unregister_stream(session_id)
  @local_streams.delete(session_id)

  @redis.multi do |multi|
    multi.del("#{STREAM_KEY_PREFIX}#{session_id}")
    multi.del("#{HEARTBEAT_KEY_PREFIX}#{session_id}")
  end
end