Class: MCPClient::ServerSSE

Inherits:
ServerBase show all
Defined in:
lib/mcp_client/server_sse.rb

Overview

Implementation of MCP server that communicates via Server-Sent Events (SSE) Useful for communicating with remote MCP servers over HTTP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ServerBase

#on_notification, #ping

Constructor Details

#initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil) ⇒ ServerSSE

Returns a new instance of ServerSSE.

Parameters:

  • base_url (String)

    The base URL of the MCP server

  • headers (Hash) (defaults to: {})

    Additional headers to include in requests

  • read_timeout (Integer) (defaults to: 30)

    Read timeout in seconds (default: 30)

  • retries (Integer) (defaults to: 0)

    number of retry attempts on transient errors

  • retry_backoff (Numeric) (defaults to: 1)

    base delay in seconds for exponential backoff

  • logger (Logger, nil) (defaults to: nil)

    optional logger



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/mcp_client/server_sse.rb', line 22

def initialize(base_url:, headers: {}, read_timeout: 30, retries: 0, retry_backoff: 1, logger: nil)
  super()
  @logger = logger || Logger.new($stdout, level: Logger::WARN)
  @logger.progname = self.class.name
  @logger.formatter = proc { |severity, _datetime, progname, msg| "#{severity} [#{progname}] #{msg}\n" }
  @max_retries = retries
  @retry_backoff = retry_backoff
  # Normalize base_url: strip any trailing slash, use exactly as provided
  @base_url = base_url.chomp('/')
  @headers = headers.merge({
                             'Accept' => 'text/event-stream',
                             'Cache-Control' => 'no-cache',
                             'Connection' => 'keep-alive'
                           })
  @http_client = nil
  @tools = nil
  @read_timeout = read_timeout
  @session_id = nil
  @tools_data = nil
  @request_id = 0
  @sse_results = {}
  @mutex = Monitor.new
  @buffer = ''
  @sse_connected = false
  @connection_established = false
  @connection_cv = @mutex.new_cond
  @initialized = false
  # Whether to use SSE transport; may disable if handshake fails
  @use_sse = true
end

Instance Attribute Details

#base_urlObject (readonly)

Returns the value of attribute base_url.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def base_url
  @base_url
end

#capabilitiesObject (readonly)

Returns the value of attribute capabilities.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def capabilities
  @capabilities
end

#http_clientObject (readonly)

Returns the value of attribute http_client.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def http_client
  @http_client
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def server_info
  @server_info
end

#session_idObject (readonly)

Returns the value of attribute session_id.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def session_id
  @session_id
end

#toolsObject (readonly)

Returns the value of attribute tools.



14
15
16
# File 'lib/mcp_client/server_sse.rb', line 14

def tools
  @tools
end

Instance Method Details

#call_tool(tool_name, parameters) ⇒ Object

Call a tool with the given parameters

Parameters:

  • tool_name (String)

    the name of the tool to call

  • parameters (Hash)

    the parameters to pass to the tool

Returns:

  • (Object)

    the result of the tool invocation

Raises:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/mcp_client/server_sse.rb', line 101

def call_tool(tool_name, parameters)
  ensure_initialized

  begin
    request_id = @mutex.synchronize { @request_id += 1 }

    json_rpc_request = {
      jsonrpc: '2.0',
      id: request_id,
      method: 'tools/call',
      params: {
        name: tool_name,
        arguments: parameters
      }
    }

    send_jsonrpc_request(json_rpc_request)
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error calling tool '#{tool_name}': #{e.message}"
  end
end

#call_tool_streaming(tool_name, parameters) ⇒ Enumerator

Stream tool call fallback for SSE transport (yields single result)

Parameters:

  • tool_name (String)
  • parameters (Hash)

Returns:

  • (Enumerator)


57
58
59
60
61
# File 'lib/mcp_client/server_sse.rb', line 57

def call_tool_streaming(tool_name, parameters)
  Enumerator.new do |yielder|
    yielder << call_tool(tool_name, parameters)
  end
end

#cleanupObject

Clean up the server connection Properly closes HTTP connections and clears cached tools



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/mcp_client/server_sse.rb', line 167

def cleanup
  @mutex.synchronize do
    begin
      @sse_thread&.kill
    rescue StandardError
      nil
    end
    @sse_thread = nil

    if @http_client
      @http_client.finish if @http_client.started?
      @http_client = nil
    end

    @tools = nil
    @session_id = nil
    @connection_established = false
    @sse_connected = false
  end
end

#connectBoolean

Connect to the MCP server over HTTP/HTTPS with SSE

Returns:

  • (Boolean)

    true if connection was successful

Raises:



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/mcp_client/server_sse.rb', line 131

def connect
  @mutex.synchronize do
    return true if @connection_established

    uri = URI.parse(@base_url)
    @http_client = Net::HTTP.new(uri.host, uri.port)

    if uri.scheme == 'https'
      @http_client.use_ssl = true
      @http_client.verify_mode = OpenSSL::SSL::VERIFY_PEER
    end

    @http_client.open_timeout = 10
    @http_client.read_timeout = @read_timeout
    @http_client.keep_alive_timeout = 60

    @http_client.start
    start_sse_thread

    timeout = 10
    success = @connection_cv.wait(timeout) { @connection_established }

    unless success
      cleanup
      raise MCPClient::Errors::ConnectionError, 'Timed out waiting for SSE connection to be established'
    end

    @connection_established
  end
rescue StandardError => e
  cleanup
  raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
end

#list_toolsArray<MCPClient::Tool>

List all tools available from the MCP server

Returns:

Raises:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/mcp_client/server_sse.rb', line 68

def list_tools
  @mutex.synchronize do
    return @tools if @tools
  end

  ensure_initialized

  begin
    tools_data = request_tools_list
    @mutex.synchronize do
      @tools = tools_data.map do |tool_data|
        MCPClient::Tool.from_json(tool_data)
      end
    end

    @mutex.synchronize { @tools }
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error listing tools: #{e.message}"
  end
end

#rpc_notify(method, params = {}) ⇒ void

This method returns an undefined value.

Send a JSON-RPC notification (no response expected)

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the notification



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/mcp_client/server_sse.rb', line 205

def rpc_notify(method, params = {})
  ensure_initialized
  url_base = @base_url.sub(%r{/sse/?$}, '')
  uri = URI.parse("#{url_base}/messages?sessionId=#{@session_id}")
  rpc_http = Net::HTTP.new(uri.host, uri.port)
  if uri.scheme == 'https'
    rpc_http.use_ssl = true
    rpc_http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  end
  rpc_http.open_timeout = 10
  rpc_http.read_timeout = @read_timeout
  rpc_http.keep_alive_timeout = 60
  rpc_http.start do |http|
    http_req = Net::HTTP::Post.new(uri)
    http_req.content_type = 'application/json'
    http_req.body = { jsonrpc: '2.0', method: method, params: params }.to_json
    headers = @headers.dup
    headers.except('Accept', 'Cache-Control').each { |k, v| http_req[k] = v }
    response = http.request(http_req)
    unless response.is_a?(Net::HTTPSuccess)
      raise MCPClient::Errors::ServerError, "Notification failed: #{response.code} #{response.message}"
    end
  end
rescue StandardError => e
  raise MCPClient::Errors::TransportError, "Failed to send notification: #{e.message}"
ensure
  rpc_http.finish if rpc_http&.started?
end

#rpc_request(method, params = {}) ⇒ Object

Generic JSON-RPC request: send method with params and return result

Parameters:

  • method (String)

    JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the request

Returns:

  • (Object)

    result from JSON-RPC response



192
193
194
195
196
197
198
199
# File 'lib/mcp_client/server_sse.rb', line 192

def rpc_request(method, params = {})
  ensure_initialized
  with_retry do
    request_id = @mutex.synchronize { @request_id += 1 }
    request = { jsonrpc: '2.0', id: request_id, method: method, params: params }
    send_jsonrpc_request(request)
  end
end