Class: ModelContextProtocol::Server::StreamableHttpTransport::SessionStore

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport/session_store.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_client, ttl: 3600) ⇒ SessionStore



8
9
10
11
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 8

def initialize(redis_client, ttl: 3600)
  @redis = redis_client
  @ttl = ttl
end

Instance Method Details

#cleanup_session(session_id) ⇒ Object



67
68
69
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 67

def cleanup_session(session_id)
  @redis.del("session:#{session_id}")
end

#create_session(session_id, data) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 13

def create_session(session_id, data)
  session_data = {
    id: session_id,
    server_instance: data[:server_instance],
    context: data[:context] || {},
    created_at: data[:created_at] || Time.now.to_f,
    last_activity: Time.now.to_f,
    active_stream: false
  }

  @redis.hset("session:#{session_id}", session_data.transform_values(&:to_json))
  @redis.expire("session:#{session_id}", @ttl)
  session_id
end

#get_all_active_sessionsObject



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 107

def get_all_active_sessions
  keys = @redis.keys("session:*")
  active_sessions = []

  keys.each do |key|
    session_id = key.sub("session:", "")
    if session_has_active_stream?(session_id)
      active_sessions << session_id
    end
  end

  active_sessions
end

#get_session_context(session_id) ⇒ Object



62
63
64
65
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 62

def get_session_context(session_id)
  context_data = @redis.hget("session:#{session_id}", "context")
  context_data ? JSON.parse(context_data) : {}
end

#get_session_server(session_id) ⇒ Object



48
49
50
51
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 48

def get_session_server(session_id)
  server_data = @redis.hget("session:#{session_id}", "stream_server")
  server_data ? JSON.parse(server_data) : nil
end

#get_sessions_with_messagesObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 90

def get_sessions_with_messages
  session_keys = @redis.keys("session:*")
  sessions_with_messages = []

  session_keys.each do |key|
    session_id = key.sub("session:", "")
    queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl)
    if queue.has_messages?
      sessions_with_messages << session_id
    end
  end

  sessions_with_messages
rescue
  []
end

#mark_stream_active(session_id, server_instance) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 28

def mark_stream_active(session_id, server_instance)
  @redis.multi do |multi|
    multi.hset("session:#{session_id}",
      "active_stream", true.to_json,
      "stream_server", server_instance.to_json,
      "last_activity", Time.now.to_f.to_json)
    multi.expire("session:#{session_id}", @ttl)
  end
end

#mark_stream_inactive(session_id) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 38

def mark_stream_inactive(session_id)
  @redis.multi do |multi|
    multi.hset("session:#{session_id}",
      "active_stream", false.to_json,
      "stream_server", nil.to_json,
      "last_activity", Time.now.to_f.to_json)
    multi.expire("session:#{session_id}", @ttl)
  end
end

#poll_messages_for_session(session_id) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 81

def poll_messages_for_session(session_id)
  return [] unless session_exists?(session_id)

  queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl)
  queue.poll_messages
rescue
  []
end

#queue_message_for_session(session_id, message) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 71

def queue_message_for_session(session_id, message)
  return false unless session_exists?(session_id)

  queue = SessionMessageQueue.new(@redis, session_id, ttl: @ttl)
  queue.push_message(message)
  true
rescue
  false
end

#session_exists?(session_id) ⇒ Boolean



53
54
55
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 53

def session_exists?(session_id)
  @redis.exists("session:#{session_id}") == 1
end

#session_has_active_stream?(session_id) ⇒ Boolean



57
58
59
60
# File 'lib/model_context_protocol/server/streamable_http_transport/session_store.rb', line 57

def session_has_active_stream?(session_id)
  stream_data = @redis.hget("session:#{session_id}", "active_stream")
  stream_data ? JSON.parse(stream_data) : false
end