Class: ActionMCP::Server::SolidMcpAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/action_mcp/server/solid_mcp_adapter.rb

Overview

Adapter for SolidMCP PubSub - optimized for MCP’s session-based messaging

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ SolidMcpAdapter

Returns a new instance of SolidMcpAdapter.



11
12
13
14
15
16
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 11

def initialize(options = {})
  @options = options
  @subscriptions = Concurrent::Map.new
  @session_callbacks = Concurrent::Map.new
  @pubsub = nil
end

Instance Method Details

#broadcast(channel, message) ⇒ Object

Broadcast a message to a channel

Parameters:

  • channel (String)

    The channel name

  • message (String)

    The message to broadcast



95
96
97
98
99
100
101
102
103
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 95

def broadcast(channel, message)
  session_id = extract_session_id(channel)

  # Parse the message to extract event type if it's JSON-RPC
  event_type = extract_event_type(message)

  ensure_pubsub.broadcast(session_id, event_type, message)
  log_broadcast_event(channel, message)
end

#has_subscribers?(channel) ⇒ Boolean

Check if a channel has subscribers

Parameters:

  • channel (String)

    The channel name

Returns:

  • (Boolean)

    True if channel has subscribers



108
109
110
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 108

def has_subscribers?(channel)
  @subscriptions.values.any? { |sub| sub[:channel] == channel }
end

#shutdownObject

Shut down the adapter gracefully



120
121
122
123
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 120

def shutdown
  @pubsub&.shutdown
  @pubsub = nil
end

#subscribe(channel, message_callback, success_callback = nil) ⇒ String

Subscribe to a session’s messages

Parameters:

  • channel (String)

    The channel name (format: “action_mcp:session:SESSION_ID”)

  • message_callback (Proc)

    Callback for received messages

  • success_callback (Proc) (defaults to: nil)

    Callback for successful subscription

Returns:

  • (String)

    Subscription ID



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 23

def subscribe(channel, message_callback, success_callback = nil)
  subscription_id = SecureRandom.uuid_v7
  session_id = extract_session_id(channel)

  @subscriptions[subscription_id] = {
    channel: channel,
    session_id: session_id,
    message_callback: message_callback
  }

  # Initialize callback array for this session if needed
  @session_callbacks[session_id] ||= Concurrent::Array.new

  # Only subscribe to SolidMCP once per session
  if @session_callbacks[session_id].empty?
    ensure_pubsub.subscribe(session_id) do |message|
      # Message from SolidMCP includes event_type, data, and id
      # Deliver to all callbacks for this session
      @subscriptions.each_value do |subscription|
        next unless subscription[:session_id] == session_id && subscription[:message_callback]

        begin
          subscription[:message_callback].call(message[:data])
        rescue StandardError => e
          log_error("Error in message callback: #{e.message}")
        end
      end
    end
  end

  # Track that we have a callback for this session
  @session_callbacks[session_id] << subscription_id

  log_subscription_event(channel, "Subscribed", subscription_id)
  success_callback&.call

  subscription_id
end

#subscribed_to?(channel) ⇒ Boolean

Check if we’re subscribed to a channel

Parameters:

  • channel (String)

    The channel name

Returns:

  • (Boolean)

    True if we’re subscribed



115
116
117
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 115

def subscribed_to?(channel)
  has_subscribers?(channel)
end

#unsubscribe(channel, callback = nil) ⇒ Object

Unsubscribe from a channel

Parameters:

  • channel (String)

    The channel name

  • callback (Proc) (defaults to: nil)

    Optional callback for unsubscribe completion



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/action_mcp/server/solid_mcp_adapter.rb', line 65

def unsubscribe(channel, callback = nil)
  session_id = extract_session_id(channel)

  # Remove subscriptions for this channel
  removed_ids = []
  @subscriptions.each do |id, sub|
    if sub[:channel] == channel
      @subscriptions.delete(id)
      removed_ids << id
    end
  end

  # Remove from session callbacks
  removed_ids.each do |id|
    @session_callbacks[session_id]&.delete(id)
  end

  # Only unsubscribe from SolidMCP if no more callbacks for this session
  if @session_callbacks[session_id] && @session_callbacks[session_id].empty?
    ensure_pubsub.unsubscribe(session_id)
    @session_callbacks.delete(session_id)
  end

  log_subscription_event(channel, "Unsubscribed")
  callback&.call
end