Class: MCPClient::ServerSSE

Inherits:
ServerBase show all
Defined in:
lib/mcp_client/server_sse.rb

Overview

Implementation of MCP server that communicates via Server-Sent Events (SSE) Useful for communicating with remote MCP servers over HTTP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ServerBase

#on_notification, #ping

Constructor Details

#initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil) ⇒ ServerSSE

Returns a new instance of ServerSSE.

Parameters:

  • base_url (String)

    The base URL of the MCP server

  • headers (Hash) (defaults to: {})

    Additional headers to include in requests

  • read_timeout (Integer) (defaults to: 30)

    Read timeout in seconds (default: 30)

  • retries (Integer) (defaults to: 0)

    number of retry attempts on transient errors

  • retry_backoff (Numeric) (defaults to: 1)

    base delay in seconds for exponential backoff

  • logger (Logger, nil) (defaults to: nil)

    optional logger



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/mcp_client/server_sse.rb', line 22

def initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil)
  super()
  @logger = logger || Logger.new($stdout, level: Logger::WARN)
  @logger.progname = self.class.name
  @logger.formatter = proc { |severity, _datetime, progname, msg| "#{severity} [#{progname}] #{msg}\n" }
  @max_retries = retries
  @retry_backoff = retry_backoff
  # Normalize base_url: strip any trailing slash, use exactly as provided
  @base_url = base_url.chomp('/')
  @headers = headers.merge({
                             'Accept' => 'text/event-stream',
                             'Cache-Control' => 'no-cache',
                             'Connection' => 'keep-alive'
                           })
  # HTTP client is managed via Faraday
  @tools = nil
  @read_timeout = read_timeout

  # SSE-provided JSON-RPC endpoint path for POST requests
  @rpc_endpoint = nil
  @tools_data = nil
  @request_id = 0
  @sse_results = {}
  @mutex = Monitor.new
  @buffer = ''
  @sse_connected = false
  @connection_established = false
  @connection_cv = @mutex.new_cond
  @initialized = false
  @auth_error = nil
  # Whether to use SSE transport; may disable if handshake fails
  @use_sse = true
end

Instance Attribute Details

#base_urlObject (readonly)

Returns the value of attribute base_url.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def base_url
  @base_url
end

#capabilitiesObject (readonly)

Returns the value of attribute capabilities.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def capabilities
  @capabilities
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def server_info
  @server_info
end

#toolsObject (readonly)

Returns the value of attribute tools.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def tools
  @tools
end

Instance Method Details

#call_tool(tool_name, parameters) ⇒ Object

Call a tool with the given parameters

Parameters:

  • tool_name (String)

    the name of the tool to call

  • parameters (Hash)

    the parameters to pass to the tool

Returns:

  • (Object)

    the result of the tool invocation

Raises:



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/mcp_client/server_sse.rb', line 104

def call_tool(tool_name, parameters)
  ensure_initialized

  begin
    request_id = @mutex.synchronize { @request_id += 1 }

    json_rpc_request = {
      jsonrpc: '2.0',
      id: request_id,
      method: 'tools/call',
      params: {
        name: tool_name,
        arguments: parameters
      }
    }

    send_jsonrpc_request(json_rpc_request)
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error calling tool '#{tool_name}': #{e.message}"
  end
end

#call_tool_streaming(tool_name, parameters) ⇒ Enumerator

Stream tool call fallback for SSE transport (yields single result)

Parameters:

  • tool_name (String)
  • parameters (Hash)

Returns:

  • (Enumerator)


60
61
62
63
64
# File 'lib/mcp_client/server_sse.rb', line 60

def call_tool_streaming(tool_name, parameters)
  Enumerator.new do |yielder|
    yielder << call_tool(tool_name, parameters)
  end
end

#cleanupObject

Clean up the server connection Properly closes HTTP connections and clears cached tools



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/mcp_client/server_sse.rb', line 163

def cleanup
  @mutex.synchronize do
    begin
      @sse_thread&.kill
    rescue StandardError
      nil
    end
    @sse_thread = nil

    if @http_client
      @http_client.finish if @http_client.started?
      @http_client = nil
    end

    @tools = nil
    @connection_established = false
    @sse_connected = false
    # Don't clear auth error as we need it for reporting the correct error
  end
end

#connectBoolean

Connect to the MCP server over HTTP/HTTPS with SSE

Returns:

  • (Boolean)

    true if connection was successful

Raises:



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/mcp_client/server_sse.rb', line 134

def connect
  return true if @mutex.synchronize { @connection_established }

  begin
    start_sse_thread
    effective_timeout = [@read_timeout || 30, 30].min
    wait_for_connection(timeout: effective_timeout)
    true
  rescue MCPClient::Errors::ConnectionError => e
    cleanup
    # Check for stored auth error first, as it's more specific
    auth_error = @mutex.synchronize { @auth_error }
    raise MCPClient::Errors::ConnectionError, auth_error if auth_error

    raise MCPClient::Errors::ConnectionError, e.message if e.message.include?('Authorization failed')

    raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
  rescue StandardError => e
    cleanup
    # Check for stored auth error
    auth_error = @mutex.synchronize { @auth_error }
    raise MCPClient::Errors::ConnectionError, auth_error if auth_error

    raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
  end
end

#list_toolsArray<MCPClient::Tool>

List all tools available from the MCP server

Returns:

Raises:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/mcp_client/server_sse.rb', line 71

def list_tools
  @mutex.synchronize do
    return @tools if @tools
  end

  ensure_initialized

  begin
    tools_data = request_tools_list
    @mutex.synchronize do
      @tools = tools_data.map do |tool_data|
        MCPClient::Tool.from_json(tool_data)
      end
    end

    @mutex.synchronize { @tools }
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error listing tools: #{e.message}"
  end
end

#rpc_notify(method, params = {}) ⇒ void

This method returns an undefined value.

Send a JSON-RPC notification (no response expected)

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the notification



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/mcp_client/server_sse.rb', line 201

def rpc_notify(method, params = {})
  ensure_initialized
  uri = URI.parse(@base_url)
  base = "#{uri.scheme}://#{uri.host}:#{uri.port}"
  rpc_ep = @mutex.synchronize { @rpc_endpoint }
  @rpc_conn ||= Faraday.new(url: base) do |f|
    f.request :retry, max: @max_retries, interval: @retry_backoff, backoff_factor: 2
    f.options.open_timeout = @read_timeout
    f.options.timeout = @read_timeout
    f.adapter Faraday.default_adapter
  end
  response = @rpc_conn.post(rpc_ep) do |req|
    req.headers['Content-Type'] = 'application/json'
    req.headers['Accept'] = 'application/json'
    (@headers.dup.tap do |h|
      h.delete('Accept')
      h.delete('Cache-Control')
    end).each do |k, v|
      req.headers[k] = v
    end
    req.body = { jsonrpc: '2.0', method: method, params: params }.to_json
  end
  unless response.success?
    raise MCPClient::Errors::ServerError, "Notification failed: #{response.status} #{response.reason_phrase}"
  end
rescue StandardError => e
  raise MCPClient::Errors::TransportError, "Failed to send notification: #{e.message}"
end

#rpc_request(method, params = {}) ⇒ Object

Generic JSON-RPC request: send method with params and return result

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the request

Returns:

  • (Object)

    result from JSON-RPC response



188
189
190
191
192
193
194
195
# File 'lib/mcp_client/server_sse.rb', line 188

def rpc_request(method, params = {})
  ensure_initialized
  with_retry do
    request_id = @mutex.synchronize { @request_id += 1 }
    request = { jsonrpc: '2.0', id: request_id, method: method, params: params }
    send_jsonrpc_request(request)
  end
end