Class: ActionMCP::Server::SimplePubSub
- Inherits:
-
Object
- Object
- ActionMCP::Server::SimplePubSub
- Defined in:
- lib/action_mcp/server/simple_pub_sub.rb
Overview
Simple in-memory PubSub implementation for testing and development
Constant Summary collapse
- DEFAULT_MIN_THREADS =
Thread pool configuration
5
- DEFAULT_MAX_THREADS =
10
- DEFAULT_MAX_QUEUE =
100
- DEFAULT_THREAD_TIMEOUT =
seconds
60
Instance Method Summary collapse
-
#broadcast(channel, message) ⇒ Object
Broadcast a message to a channel.
-
#has_subscribers?(channel) ⇒ Boolean
Check if a channel has subscribers.
-
#initialize(options = {}) ⇒ SimplePubSub
constructor
A new instance of SimplePubSub.
-
#shutdown ⇒ Object
Shut down the thread pool gracefully.
-
#subscribe(channel, message_callback, success_callback = nil) ⇒ String
Subscribe to a channel.
-
#subscribed_to?(channel) ⇒ Boolean
Check if we’re already subscribed to a channel.
-
#unsubscribe(channel, callback = nil) ⇒ Object
Unsubscribe from a channel.
Constructor Details
#initialize(options = {}) ⇒ SimplePubSub
Returns a new instance of SimplePubSub.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 18 def initialize( = {}) @subscriptions = Concurrent::Map.new @channels = Concurrent::Map.new # Initialize thread pool for callbacks = { min_threads: ["min_threads"] || DEFAULT_MIN_THREADS, max_threads: ["max_threads"] || DEFAULT_MAX_THREADS, max_queue: ["max_queue"] || DEFAULT_MAX_QUEUE, fallback_policy: :caller_runs, # Execute in the caller's thread if queue is full idletime: DEFAULT_THREAD_TIMEOUT } @thread_pool = Concurrent::ThreadPoolExecutor.new() end |
Instance Method Details
#broadcast(channel, message) ⇒ Object
Broadcast a message to a channel
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 84 def broadcast(channel, ) subscription_ids = @channels[channel] || [] return if subscription_ids.empty? log_broadcast_event(channel, ) subscription_ids.each do |subscription_id| subscription = @subscriptions[subscription_id] next unless subscription && subscription[:message_callback] @thread_pool.post do subscription[:message_callback].call() rescue StandardError => e log_error("Error in message callback: #{e.}\n#{e.backtrace.join("\n")}") end end end |
#has_subscribers?(channel) ⇒ Boolean
Check if a channel has subscribers
105 106 107 108 109 110 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 105 def has_subscribers?(channel) subscribers = @channels[channel] return false unless subscribers !subscribers.empty? end |
#shutdown ⇒ Object
Shut down the thread pool gracefully
113 114 115 116 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 113 def shutdown @thread_pool.shutdown @thread_pool.wait_for_termination(5) # Wait up to 5 seconds for tasks to complete end |
#subscribe(channel, message_callback, success_callback = nil) ⇒ String
Subscribe to a channel
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 38 def subscribe(channel, , success_callback = nil) subscription_id = SecureRandom.uuid @subscriptions[subscription_id] = { channel: channel, message_callback: } @channels[channel] ||= Concurrent::Array.new @channels[channel] << subscription_id log_subscription_event(channel, "Subscribed", subscription_id) success_callback&.call subscription_id end |
#subscribed_to?(channel) ⇒ Boolean
Check if we’re already subscribed to a channel
58 59 60 61 62 63 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 58 def subscribed_to?(channel) channel_subs = @channels[channel] return false if channel_subs.nil? !channel_subs.empty? end |
#unsubscribe(channel, callback = nil) ⇒ Object
Unsubscribe from a channel
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 68 def unsubscribe(channel, callback = nil) # Remove our subscriptions subscription_ids = @channels[channel] || [] subscription_ids.each do |subscription_id| @subscriptions.delete(subscription_id) end @channels.delete(channel) log_subscription_event(channel, "Unsubscribed") callback&.call end |