Class: ModelContextProtocol::Server::StreamableHttpTransport::SessionStore
- Inherits:
-
Object
- Object
- ModelContextProtocol::Server::StreamableHttpTransport::SessionStore
- Defined in:
- lib/model_context_protocol/server/streamable_http_transport/session_store.rb
Instance Method Summary collapse
- #cleanup_session(session_id) ⇒ Object
- #create_session(session_id, data) ⇒ Object
- #get_all_active_sessions ⇒ Object
- #get_session_context(session_id) ⇒ Object
- #get_session_server(session_id) ⇒ Object
- #get_sessions_with_messages ⇒ Object
-
#initialize(redis_client, ttl: 3600) ⇒ SessionStore
constructor
A new instance of SessionStore.
- #mark_stream_active(session_id, server_instance) ⇒ Object
- #mark_stream_inactive(session_id) ⇒ Object
- #poll_messages_for_session(session_id) ⇒ Object
- #queue_message_for_session(session_id, message) ⇒ Object
- #session_exists?(session_id) ⇒ Boolean
- #session_has_active_stream?(session_id) ⇒ Boolean
Constructor Details
#initialize(redis_client, ttl: 3600) ⇒ SessionStore
8 9 10 11 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 8 def initialize(redis_client, ttl: 3600) @redis = redis_client @ttl = ttl end |
Instance Method Details
#cleanup_session(session_id) ⇒ Object
67 68 69 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 67 def cleanup_session(session_id) @redis.del("session:#{session_id}") end |
#create_session(session_id, data) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 13 def create_session(session_id, data) session_data = { id: session_id, server_instance: data[:server_instance], context: data[:context] || {}, created_at: data[:created_at] || Time.now.to_f, last_activity: Time.now.to_f, active_stream: false } @redis.hset("session:#{session_id}", session_data.transform_values(&:to_json)) @redis.expire("session:#{session_id}", @ttl) session_id end |
#get_all_active_sessions ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 107 def get_all_active_sessions keys = @redis.keys("session:*") active_sessions = [] keys.each do |key| session_id = key.sub("session:", "") if session_has_active_stream?(session_id) active_sessions << session_id end end active_sessions end |
#get_session_context(session_id) ⇒ Object
62 63 64 65 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 62 def get_session_context(session_id) context_data = @redis.hget("session:#{session_id}", "context") context_data ? JSON.parse(context_data) : {} end |
#get_session_server(session_id) ⇒ Object
48 49 50 51 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 48 def get_session_server(session_id) server_data = @redis.hget("session:#{session_id}", "stream_server") server_data ? JSON.parse(server_data) : nil end |
#get_sessions_with_messages ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 90 def session_keys = @redis.keys("session:*") = [] session_keys.each do |key| session_id = key.sub("session:", "") queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl) if queue. << session_id end end rescue [] end |
#mark_stream_active(session_id, server_instance) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 28 def mark_stream_active(session_id, server_instance) @redis.multi do |multi| multi.hset("session:#{session_id}", "active_stream", true.to_json, "stream_server", server_instance.to_json, "last_activity", Time.now.to_f.to_json) multi.expire("session:#{session_id}", @ttl) end end |
#mark_stream_inactive(session_id) ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 38 def mark_stream_inactive(session_id) @redis.multi do |multi| multi.hset("session:#{session_id}", "active_stream", false.to_json, "stream_server", nil.to_json, "last_activity", Time.now.to_f.to_json) multi.expire("session:#{session_id}", @ttl) end end |
#poll_messages_for_session(session_id) ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 81 def (session_id) return [] unless session_exists?(session_id) queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl) queue. rescue [] end |
#queue_message_for_session(session_id, message) ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 71 def (session_id, ) return false unless session_exists?(session_id) queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl) queue.() true rescue false end |
#session_exists?(session_id) ⇒ Boolean
53 54 55 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 53 def session_exists?(session_id) @redis.exists("session:#{session_id}") == 1 end |
#session_has_active_stream?(session_id) ⇒ Boolean
57 58 59 60 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 57 def session_has_active_stream?(session_id) stream_data = @redis.hget("session:#{session_id}", "active_stream") stream_data ? JSON.parse(stream_data) : false end |