Class: ActionMCP::Server::SimplePubSub

Inherits:
Object
  • Object
show all
Defined in:
lib/action_mcp/server/simple_pub_sub.rb

Overview

Simple in-memory PubSub implementation for testing and development

Constant Summary collapse

DEFAULT_MIN_THREADS =

Thread pool configuration

5
DEFAULT_MAX_THREADS =
10
DEFAULT_MAX_QUEUE =
100
DEFAULT_THREAD_TIMEOUT =

seconds

60

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ SimplePubSub

Returns a new instance of SimplePubSub.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 18

def initialize(options = {})
  @subscriptions = Concurrent::Map.new
  @channels = Concurrent::Map.new

  # Initialize thread pool for callbacks
  pool_options = {
    min_threads: options["min_threads"] || DEFAULT_MIN_THREADS,
    max_threads: options["max_threads"] || DEFAULT_MAX_THREADS,
    max_queue: options["max_queue"] || DEFAULT_MAX_QUEUE,
    fallback_policy: :caller_runs, # Execute in the caller's thread if queue is full
    idletime: DEFAULT_THREAD_TIMEOUT
  }
  @thread_pool = Concurrent::ThreadPoolExecutor.new(pool_options)
end

Instance Method Details

#broadcast(channel, message) ⇒ Object

Broadcast a message to a channel

Parameters:

  • channel (String)

    The channel name

  • message (String)

    The message to broadcast



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 84

def broadcast(channel, message)
  subscription_ids = @channels[channel] || []
  return if subscription_ids.empty?

  log_broadcast_event(channel, message)

  subscription_ids.each do |subscription_id|
    subscription = @subscriptions[subscription_id]
    next unless subscription && subscription[:message_callback]

    @thread_pool.post do
      subscription[:message_callback].call(message)
    rescue StandardError => e
      log_error("Error in message callback: #{e.message}\n#{e.backtrace.join("\n")}")
    end
  end
end

#has_subscribers?(channel) ⇒ Boolean

Check if a channel has subscribers

Parameters:

  • channel (String)

    The channel name

Returns:

  • (Boolean)

    True if channel has subscribers



105
106
107
108
109
110
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 105

def has_subscribers?(channel)
  subscribers = @channels[channel]
  return false unless subscribers

  !subscribers.empty?
end

#shutdownObject

Shut down the thread pool gracefully



113
114
115
116
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 113

def shutdown
  @thread_pool.shutdown
  @thread_pool.wait_for_termination(5) # Wait up to 5 seconds for tasks to complete
end

#subscribe(channel, message_callback, success_callback = nil) ⇒ String

Subscribe to a channel

Parameters:

  • channel (String)

    The channel name

  • message_callback (Proc)

    Callback for received messages

  • success_callback (Proc) (defaults to: nil)

    Callback for successful subscription

Returns:

  • (String)

    Subscription ID



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 38

def subscribe(channel, message_callback, success_callback = nil)
  subscription_id = SecureRandom.uuid

  @subscriptions[subscription_id] = {
    channel: channel,
    message_callback: message_callback
  }

  @channels[channel] ||= Concurrent::Array.new
  @channels[channel] << subscription_id

  log_subscription_event(channel, "Subscribed", subscription_id)
  success_callback&.call

  subscription_id
end

#subscribed_to?(channel) ⇒ Boolean

Check if we’re already subscribed to a channel

Parameters:

  • channel (String)

    The channel name

Returns:

  • (Boolean)

    True if we’re already subscribed



58
59
60
61
62
63
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 58

def subscribed_to?(channel)
  channel_subs = @channels[channel]
  return false if channel_subs.nil?

  !channel_subs.empty?
end

#unsubscribe(channel, callback = nil) ⇒ Object

Unsubscribe from a channel

Parameters:

  • channel (String)

    The channel name

  • callback (Proc) (defaults to: nil)

    Optional callback for unsubscribe completion



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/action_mcp/server/simple_pub_sub.rb', line 68

def unsubscribe(channel, callback = nil)
  # Remove our subscriptions
  subscription_ids = @channels[channel] || []
  subscription_ids.each do |subscription_id|
    @subscriptions.delete(subscription_id)
  end

  @channels.delete(channel)

  log_subscription_event(channel, "Unsubscribed")
  callback&.call
end