Class: MCPClient::ServerSSE

Inherits:
ServerBase show all
Defined in:
lib/mcp_client/server_sse.rb

Overview

Implementation of MCP server that communicates via Server-Sent Events (SSE) Useful for communicating with remote MCP servers over HTTP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ServerBase

#on_notification, #ping

Constructor Details

#initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil) ⇒ ServerSSE

Returns a new instance of ServerSSE.

Parameters:

  • base_url (String)

    The base URL of the MCP server

  • headers (Hash) (defaults to: {})

    Additional headers to include in requests

  • read_timeout (Integer) (defaults to: 30)

    Read timeout in seconds (default: 30)

  • retries (Integer) (defaults to: 0)

    number of retry attempts on transient errors

  • retry_backoff (Numeric) (defaults to: 1)

    base delay in seconds for exponential backoff

  • logger (Logger, nil) (defaults to: nil)

    optional logger



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/mcp_client/server_sse.rb', line 22

def initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil)
  super()
  @logger = logger || Logger.new($stdout, level: Logger::WARN)
  @logger.progname = self.class.name
  @logger.formatter = proc { |severity, _datetime, progname, msg| "#{severity} [#{progname}] #{msg}\n" }
  @max_retries = retries
  @retry_backoff = retry_backoff
  # Normalize base_url: strip any trailing slash, use exactly as provided
  @base_url = base_url.chomp('/')
  @headers = headers.merge({
                             'Accept' => 'text/event-stream',
                             'Cache-Control' => 'no-cache',
                             'Connection' => 'keep-alive'
                           })
  # HTTP client is managed via Faraday
  @tools = nil
  @read_timeout = read_timeout

  # SSE-provided JSON-RPC endpoint path for POST requests
  @rpc_endpoint = nil
  @tools_data = nil
  @request_id = 0
  @sse_results = {}
  @mutex = Monitor.new
  @buffer = ''
  @sse_connected = false
  @connection_established = false
  @connection_cv = @mutex.new_cond
  @initialized = false
  # Whether to use SSE transport; may disable if handshake fails
  @use_sse = true
end

Instance Attribute Details

#base_urlObject (readonly)

Returns the value of attribute base_url.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def base_url
  @base_url
end

#capabilitiesObject (readonly)

Returns the value of attribute capabilities.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def capabilities
  @capabilities
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def server_info
  @server_info
end

#toolsObject (readonly)

Returns the value of attribute tools.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def tools
  @tools
end

Instance Method Details

#call_tool(tool_name, parameters) ⇒ Object

Call a tool with the given parameters

Parameters:

  • tool_name (String)

    the name of the tool to call

  • parameters (Hash)

    the parameters to pass to the tool

Returns:

  • (Object)

    the result of the tool invocation

Raises:



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/mcp_client/server_sse.rb', line 103

def call_tool(tool_name, parameters)
  ensure_initialized

  begin
    request_id = @mutex.synchronize { @request_id += 1 }

    json_rpc_request = {
      jsonrpc: '2.0',
      id: request_id,
      method: 'tools/call',
      params: {
        name: tool_name,
        arguments: parameters
      }
    }

    send_jsonrpc_request(json_rpc_request)
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error calling tool '#{tool_name}': #{e.message}"
  end
end

#call_tool_streaming(tool_name, parameters) ⇒ Enumerator

Stream tool call fallback for SSE transport (yields single result)

Parameters:

  • tool_name (String)
  • parameters (Hash)

Returns:

  • (Enumerator)


59
60
61
62
63
# File 'lib/mcp_client/server_sse.rb', line 59

def call_tool_streaming(tool_name, parameters)
  Enumerator.new do |yielder|
    yielder << call_tool(tool_name, parameters)
  end
end

#cleanupObject

Clean up the server connection Properly closes HTTP connections and clears cached tools



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/mcp_client/server_sse.rb', line 157

def cleanup
  @mutex.synchronize do
    begin
      @sse_thread&.kill
    rescue StandardError
      nil
    end
    @sse_thread = nil

    if @http_client
      @http_client.finish if @http_client.started?
      @http_client = nil
    end

    @tools = nil
    @connection_established = false
    @sse_connected = false
  end
end

#connectBoolean

Connect to the MCP server over HTTP/HTTPS with SSE

Returns:

  • (Boolean)

    true if connection was successful

Raises:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/mcp_client/server_sse.rb', line 133

def connect
  @mutex.synchronize do
    return true if @connection_established

    # Start SSE listener using Faraday HTTP client
    start_sse_thread

    timeout = 10
    success = @connection_cv.wait(timeout) { @connection_established }

    unless success
      cleanup
      raise MCPClient::Errors::ConnectionError, 'Timed out waiting for SSE connection to be established'
    end

    @connection_established
  end
rescue StandardError => e
  cleanup
  raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
end

#list_toolsArray<MCPClient::Tool>

List all tools available from the MCP server

Returns:

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mcp_client/server_sse.rb', line 70

def list_tools
  @mutex.synchronize do
    return @tools if @tools
  end

  ensure_initialized

  begin
    tools_data = request_tools_list
    @mutex.synchronize do
      @tools = tools_data.map do |tool_data|
        MCPClient::Tool.from_json(tool_data)
      end
    end

    @mutex.synchronize { @tools }
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error listing tools: #{e.message}"
  end
end

#rpc_notify(method, params = {}) ⇒ void

This method returns an undefined value.

Send a JSON-RPC notification (no response expected)

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the notification



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/mcp_client/server_sse.rb', line 194

def rpc_notify(method, params = {})
  ensure_initialized
  uri = URI.parse(@base_url)
  base = "#{uri.scheme}://#{uri.host}:#{uri.port}"
  rpc_ep = @mutex.synchronize { @rpc_endpoint }
  @rpc_conn ||= Faraday.new(url: base) do |f|
    f.request :retry, max: @max_retries, interval: @retry_backoff, backoff_factor: 2
    f.options.open_timeout = @read_timeout
    f.options.timeout = @read_timeout
    f.adapter Faraday.default_adapter
  end
  response = @rpc_conn.post(rpc_ep) do |req|
    req.headers['Content-Type'] = 'application/json'
    req.headers['Accept'] = 'application/json'
    (@headers.dup.tap do |h|
      h.delete('Accept')
      h.delete('Cache-Control')
    end).each do |k, v|
      req.headers[k] = v
    end
    req.body = { jsonrpc: '2.0', method: method, params: params }.to_json
  end
  unless response.success?
    raise MCPClient::Errors::ServerError, "Notification failed: #{response.status} #{response.reason_phrase}"
  end
rescue StandardError => e
  raise MCPClient::Errors::TransportError, "Failed to send notification: #{e.message}"
end

#rpc_request(method, params = {}) ⇒ Object

Generic JSON-RPC request: send method with params and return result

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the request

Returns:

  • (Object)

    result from JSON-RPC response



181
182
183
184
185
186
187
188
# File 'lib/mcp_client/server_sse.rb', line 181

def rpc_request(method, params = {})
  ensure_initialized
  with_retry do
    request_id = @mutex.synchronize { @request_id += 1 }
    request = { jsonrpc: '2.0', id: request_id, method: method, params: params }
    send_jsonrpc_request(request)
  end
end