Class: FastMcp::Transports::RackTransport

Inherits:
BaseTransport show all
Defined in:
lib/mcp/transports/rack_transport.rb

Overview

Rack middleware transport for MCP This transport can be mounted in any Rack-compatible web framework

Direct Known Subclasses

AuthenticatedRackTransport

Constant Summary collapse

DEFAULT_PATH_PREFIX =

rubocop:disable Metrics/ClassLength

'/mcp'
DEFAULT_ALLOWED_ORIGINS =
['localhost', '127.0.0.1', '[::1]'].freeze
DEFAULT_ALLOWED_IPS =
['127.0.0.1', '::1', '::ffff:127.0.0.1'].freeze
SERVER_ENV_KEY =
'fast_mcp.server'
SSE_HEADERS =
{
  'Content-Type' => 'text/event-stream',
  'Cache-Control' => 'no-cache, no-store, must-revalidate',
  'Connection' => 'keep-alive',
  'X-Accel-Buffering' => 'no', # For Nginx
  'Access-Control-Allow-Origin' => '*', # Allow CORS
  'Access-Control-Allow-Methods' => 'GET, OPTIONS',
  'Access-Control-Allow-Headers' => 'Content-Type',
  'Access-Control-Max-Age' => '86400', # 24 hours
  'Keep-Alive' => 'timeout=600', # 10 minutes timeout
  'Pragma' => 'no-cache',
  'Expires' => '0'
}.freeze

Instance Attribute Summary collapse

Attributes inherited from BaseTransport

#logger, #server

Instance Method Summary collapse

Methods inherited from BaseTransport

#process_message

Constructor Details

#initialize(app, server, options = {}, &_block) ⇒ RackTransport

Returns a new instance of RackTransport.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/mcp/transports/rack_transport.rb', line 35

def initialize(app, server, options = {}, &_block)
  super(server, logger: options[:logger])
  @app = app
  @path_prefix = options[:path_prefix] || DEFAULT_PATH_PREFIX
  @messages_route = options[:messages_route] || 'messages'
  @sse_route = options[:sse_route] || 'sse'
  @allowed_origins = options[:allowed_origins] || DEFAULT_ALLOWED_ORIGINS
  @localhost_only = options.fetch(:localhost_only, true) # Default to localhost-only mode
  @allowed_ips = options[:allowed_ips] || DEFAULT_ALLOWED_IPS
  @sse_clients = Concurrent::Hash.new
  @sse_clients_mutex = Mutex.new
  @running = false
  @filtered_servers_cache = {}
end

Instance Attribute Details

#allowed_ipsObject (readonly)

Returns the value of attribute allowed_ips.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def allowed_ips
  @allowed_ips
end

#allowed_originsObject (readonly)

Returns the value of attribute allowed_origins.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def allowed_origins
  @allowed_origins
end

#appObject (readonly)

Returns the value of attribute app.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def app
  @app
end

#localhost_onlyObject (readonly)

Returns the value of attribute localhost_only.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def localhost_only
  @localhost_only
end

#messages_routeObject (readonly)

Returns the value of attribute messages_route.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def messages_route
  @messages_route
end

#path_prefixObject (readonly)

Returns the value of attribute path_prefix.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def path_prefix
  @path_prefix
end

#sse_clientsObject (readonly)

Returns the value of attribute sse_clients.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def sse_clients
  @sse_clients
end

#sse_routeObject (readonly)

Returns the value of attribute sse_route.



32
33
34
# File 'lib/mcp/transports/rack_transport.rb', line 32

def sse_route
  @sse_route
end

Instance Method Details

#call(env) ⇒ Object

Rack call method



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/mcp/transports/rack_transport.rb', line 121

def call(env)
  request = Rack::Request.new(env)
  path = request.path
  @logger.debug("Rack request path: #{path}")

  # Check if the request is for our MCP endpoints
  if path.start_with?(@path_prefix)
    @logger.debug('Setting server transport to RackTransport')
    @server.transport = self
    handle_mcp_request(request, env)
  else
    # Pass through to the main application
    @app.call(env)
  end
end

#register_sse_client(client_id, stream, mutex = nil) ⇒ Object

Register a new SSE client



105
106
107
108
109
110
# File 'lib/mcp/transports/rack_transport.rb', line 105

def register_sse_client(client_id, stream, mutex = nil)
  @sse_clients_mutex.synchronize do
    @logger.info("Registering SSE client: #{client_id}")
    @sse_clients[client_id] = { stream: stream, connected_at: Time.now, mutex: mutex || Mutex.new }
  end
end

#send_message(message) ⇒ Object

Send a message to all connected SSE clients



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/mcp/transports/rack_transport.rb', line 74

def send_message(message)
  json_message = message.is_a?(String) ? message : JSON.generate(message)
  @logger.debug("Broadcasting message to #{@sse_clients.size} SSE clients: #{json_message}")

  clients_to_remove = []
  @sse_clients_mutex.synchronize do
    @sse_clients.each do |client_id, client|
      stream = client[:stream]
      mutex = client[:mutex]
      next if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) || mutex.nil?

      begin
        mutex.synchronize do
          stream.write("data: #{json_message}\n\n")
          stream.flush if stream.respond_to?(:flush)
        end
      rescue Errno::EPIPE, IOError => e
        @logger.info("Client #{client_id} disconnected: #{e.message}")
        clients_to_remove << client_id
      rescue StandardError => e
        @logger.error("Error sending message to client #{client_id}: #{e.message}")
        clients_to_remove << client_id
      end
    end
  end

  # Remove disconnected clients outside the loop to avoid modifying the hash during iteration
  clients_to_remove.each { |client_id| unregister_sse_client(client_id) }
end

#startObject

Start the transport



51
52
53
54
55
# File 'lib/mcp/transports/rack_transport.rb', line 51

def start
  @logger.debug("Starting Rack transport with path prefix: #{@path_prefix}")
  @logger.debug("DNS rebinding protection enabled. Allowed origins: #{allowed_origins.join(', ')}")
  @running = true
end

#stopObject

Stop the transport



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/mcp/transports/rack_transport.rb', line 58

def stop
  @logger.debug('Stopping Rack transport')
  @running = false

  # Close all SSE connections
  @sse_clients_mutex.synchronize do
    @sse_clients.each_value do |client|
      client[:stream].close if client[:stream].respond_to?(:close) && !client[:stream].closed?
    rescue StandardError => e
      @logger.error("Error closing SSE connection: #{e.message}")
    end
    @sse_clients.clear
  end
end

#unregister_sse_client(client_id) ⇒ Object

Unregister an SSE client



113
114
115
116
117
118
# File 'lib/mcp/transports/rack_transport.rb', line 113

def unregister_sse_client(client_id)
  @sse_clients_mutex.synchronize do
    @logger.info("Unregistering SSE client: #{client_id}")
    @sse_clients.delete(client_id)
  end
end