Class: ModelContextProtocol::Server::StreamableHttpTransport::NotificationQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb

Constant Summary collapse

QUEUE_KEY_PREFIX =
"notifications:"
DEFAULT_MAX_SIZE =
1000

Instance Method Summary collapse

Constructor Details

#initialize(redis_client, server_instance, max_size: DEFAULT_MAX_SIZE) ⇒ NotificationQueue

Returns a new instance of NotificationQueue.



9
10
11
12
13
14
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 9

def initialize(redis_client, server_instance, max_size: DEFAULT_MAX_SIZE)
  @redis = redis_client
  @server_instance = server_instance
  @queue_key = "#{QUEUE_KEY_PREFIX}#{server_instance}"
  @max_size = max_size
end

Instance Method Details

#clearObject



62
63
64
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 62

def clear
  @redis.del(@queue_key)
end

#empty?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 58

def empty?
  size == 0
end

#peek_allObject



45
46
47
48
49
50
51
52
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 45

def peek_all
  notification_jsons = @redis.lrange(@queue_key, 0, -1)
  return [] if notification_jsons.empty?

  notification_jsons.reverse.map do |notification_json|
    JSON.parse(notification_json)
  end
end

#popObject



25
26
27
28
29
30
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 25

def pop
  notification_json = @redis.rpop(@queue_key)
  return nil unless notification_json

  JSON.parse(notification_json)
end

#pop_allObject



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 32

def pop_all
  notification_jsons = @redis.multi do |multi|
    multi.lrange(@queue_key, 0, -1)
    multi.del(@queue_key)
  end.first

  return [] if notification_jsons.empty?

  notification_jsons.reverse.map do |notification_json|
    JSON.parse(notification_json)
  end
end

#push(notification) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 16

def push(notification)
  notification_json = notification.to_json

  @redis.multi do |multi|
    multi.lpush(@queue_key, notification_json)
    multi.ltrim(@queue_key, 0, @max_size - 1)
  end
end

#push_bulk(notifications) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 66

def push_bulk(notifications)
  return if notifications.empty?

  notification_jsons = notifications.map(&:to_json)

  @redis.multi do |multi|
    notification_jsons.each do |json|
      multi.lpush(@queue_key, json)
    end
    multi.ltrim(@queue_key, 0, @max_size - 1)
  end
end

#sizeObject



54
55
56
# File 'lib/model_context_protocol/server/streamable_http_transport/notification_queue.rb', line 54

def size
  @redis.llen(@queue_key)
end