Class: ModelContextProtocol::Server::StreamableHttpTransport::SessionMessageQueue
- Inherits:
-
Object
- Object
- ModelContextProtocol::Server::StreamableHttpTransport::SessionMessageQueue
- Defined in:
- lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb
Constant Summary collapse
- QUEUE_KEY_PREFIX =
"session_messages:"- LOCK_KEY_PREFIX =
"session_lock:"- DEFAULT_TTL =
1 hour
3600- MAX_MESSAGES =
1000- LOCK_TIMEOUT =
seconds
5
Instance Method Summary collapse
- #clear ⇒ Object
- #has_messages? ⇒ Boolean
-
#initialize(redis_client, session_id, ttl: DEFAULT_TTL) ⇒ SessionMessageQueue
constructor
A new instance of SessionMessageQueue.
- #message_count ⇒ Object
- #peek_messages ⇒ Object
- #poll_messages ⇒ Object
- #push_message(message) ⇒ Object
- #push_messages(messages) ⇒ Object
- #with_lock(timeout: LOCK_TIMEOUT, &block) ⇒ Object
Constructor Details
#initialize(redis_client, session_id, ttl: DEFAULT_TTL) ⇒ SessionMessageQueue
Returns a new instance of SessionMessageQueue.
13 14 15 16 17 18 19 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 13 def initialize(redis_client, session_id, ttl: DEFAULT_TTL) @redis = redis_client @session_id = session_id @queue_key = "#{QUEUE_KEY_PREFIX}#{session_id}" @lock_key = "#{LOCK_KEY_PREFIX}#{session_id}" @ttl = ttl end |
Instance Method Details
#clear ⇒ Object
80 81 82 83 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 80 def clear @redis.del(@queue_key) rescue end |
#has_messages? ⇒ Boolean
68 69 70 71 72 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 68 def @redis.exists(@queue_key) > 0 rescue false end |
#message_count ⇒ Object
74 75 76 77 78 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 74 def @redis.llen(@queue_key) rescue 0 end |
#peek_messages ⇒ Object
61 62 63 64 65 66 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 61 def = @redis.lrange(@queue_key, 0, -1) .reverse.map { |json| (json) } rescue [] end |
#poll_messages ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 45 def lua_script = " local messages = redis.call('lrange', KEYS[1], 0, -1)\n if #messages > 0 then\n redis.call('del', KEYS[1])\n end\n return messages\n LUA\n\n messages = @redis.eval(lua_script, keys: [@queue_key])\n return [] unless messages && !messages.empty?\n messages.reverse.map { |json| deserialize_message(json) }\nrescue\n []\nend\n" |
#push_message(message) ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 21 def () = () @redis.multi do |multi| multi.lpush(@queue_key, ) multi.expire(@queue_key, @ttl) multi.ltrim(@queue_key, 0, MAX_MESSAGES - 1) end end |
#push_messages(messages) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 31 def () return if .empty? = .map { |msg| (msg) } @redis.multi do |multi| .each do |json| multi.lpush(@queue_key, json) end multi.expire(@queue_key, @ttl) multi.ltrim(@queue_key, 0, MAX_MESSAGES - 1) end end |
#with_lock(timeout: LOCK_TIMEOUT, &block) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/model_context_protocol/server/streamable_http_transport/session_message_queue.rb', line 85 def with_lock(timeout: LOCK_TIMEOUT, &block) lock_id = SecureRandom.hex(16) acquired = @redis.set(@lock_key, lock_id, nx: true, ex: timeout) return false unless acquired begin yield ensure lua_script = " if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n return redis.call(\"del\", KEYS[1])\n else\n return 0\n end\n LUA\n @redis.eval(lua_script, keys: [@lock_key], argv: [lock_id])\n end\n\n true\nend\n" |