Class: ModelContextProtocol::Server::StreamableHttpTransport::SessionMessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb

Constant Summary collapse

QUEUE_KEY_PREFIX =
"session_messages:"
LOCK_KEY_PREFIX =
"session_lock:"
DEFAULT_TTL =

1 hour

3600
MAX_MESSAGES =
1000
LOCK_TIMEOUT =

seconds

5

Instance Method Summary collapse

Constructor Details

#initialize(redis_client, session_id, ttl: DEFAULT_TTL) ⇒ SessionMessageQueue

Returns a new instance of SessionMessageQueue.



13
14
15
16
17
18
19
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 13

def initialize(redis_client, session_id, ttl: DEFAULT_TTL)
  @redis = redis_client
  @session_id = session_id
  @queue_key = "#{QUEUE_KEY_PREFIX}#{session_id}"
  @lock_key = "#{LOCK_KEY_PREFIX}#{session_id}"
  @ttl = ttl
end

Instance Method Details

#clearObject



80
81
82
83
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 80

def clear
  @redis.del(@queue_key)
rescue
end

#has_messages?Boolean

Returns:

  • (Boolean)


68
69
70
71
72
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 68

def has_messages?
  @redis.exists(@queue_key) > 0
rescue
  false
end

#message_countObject



74
75
76
77
78
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 74

def message_count
  @redis.llen(@queue_key)
rescue
  0
end

#peek_messagesObject



61
62
63
64
65
66
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 61

def peek_messages
  messages = @redis.lrange(@queue_key, 0, -1)
  messages.reverse.map { |json| deserialize_message(json) }
rescue
  []
end

#poll_messagesObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 45

def poll_messages
  lua_script = "    local messages = redis.call('lrange', KEYS[1], 0, -1)\n    if #messages > 0 then\n      redis.call('del', KEYS[1])\n    end\n    return messages\n  LUA\n\n  messages = @redis.eval(lua_script, keys: [@queue_key])\n  return [] unless messages && !messages.empty?\n  messages.reverse.map { |json| deserialize_message(json) }\nrescue\n  []\nend\n"

#push_message(message) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 21

def push_message(message)
  message_json = serialize_message(message)

  @redis.multi do |multi|
    multi.lpush(@queue_key, message_json)
    multi.expire(@queue_key, @ttl)
    multi.ltrim(@queue_key, 0, MAX_MESSAGES - 1)
  end
end

#push_messages(messages) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 31

def push_messages(messages)
  return if messages.empty?

  message_jsons = messages.map { |msg| serialize_message(msg) }

  @redis.multi do |multi|
    message_jsons.each do |json|
      multi.lpush(@queue_key, json)
    end
    multi.expire(@queue_key, @ttl)
    multi.ltrim(@queue_key, 0, MAX_MESSAGES - 1)
  end
end

#with_lock(timeout: LOCK_TIMEOUT, &block) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 85

def with_lock(timeout: LOCK_TIMEOUT, &block)
  lock_id = SecureRandom.hex(16)

  acquired = @redis.set(@lock_key, lock_id, nx: true, ex: timeout)
  return false unless acquired

  begin
    yield
  ensure
    lua_script = "      if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n        return redis.call(\"del\", KEYS[1])\n      else\n        return 0\n      end\n    LUA\n    @redis.eval(lua_script, keys: [@lock_key], argv: [lock_id])\n  end\n\n  true\nend\n"