Class: MCPClient::ServerSSE

Inherits:
ServerBase show all
Defined in:
lib/mcp_client/server_sse.rb

Overview

Implementation of MCP server that communicates via Server-Sent Events (SSE) Useful for communicating with remote MCP servers over HTTP

Constant Summary collapse

CLOSE_AFTER_PING_RATIO =

Ratio of close_after timeout to ping interval

2.5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ServerBase

#on_notification

Constructor Details

#initialize(base_url:, headers: {}, read_timeout: 30, ping: 10, retries: 0, retry_backoff: 1, logger: nil) ⇒ ServerSSE



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/mcp_client/server_sse.rb', line 26

def initialize(base_url:, headers: {}, read_timeout: 30, ping: 10,
               retries: 0, retry_backoff: 1, logger: nil)
  super()
  @logger = logger || Logger.new($stdout, level: Logger::WARN)
  @logger.progname = self.class.name
  @logger.formatter = proc { |severity, _datetime, progname, msg| "#{severity} [#{progname}] #{msg}\n" }
  @max_retries = retries
  @retry_backoff = retry_backoff
  # Normalize base_url: strip any trailing slash, use exactly as provided
  @base_url = base_url.chomp('/')
  @headers = headers.merge({
                             'Accept' => 'text/event-stream',
                             'Cache-Control' => 'no-cache',
                             'Connection' => 'keep-alive'
                           })
  # HTTP client is managed via Faraday
  @tools = nil
  @read_timeout = read_timeout
  @ping_interval = ping
  # Set close_after to a multiple of the ping interval
  @close_after = (ping * CLOSE_AFTER_PING_RATIO).to_i

  # SSE-provided JSON-RPC endpoint path for POST requests
  @rpc_endpoint = nil
  @tools_data = nil
  @request_id = 0
  @sse_results = {}
  @mutex = Monitor.new
  @buffer = ''
  @sse_connected = false
  @connection_established = false
  @connection_cv = @mutex.new_cond
  @initialized = false
  @auth_error = nil
  # Whether to use SSE transport; may disable if handshake fails
  @use_sse = true

  # Time of last activity
  @last_activity_time = Time.now
  @activity_timer_thread = nil
end

Instance Attribute Details

#base_urlObject (readonly)

Returns the value of attribute base_url.



17
18
19
# File 'lib/mcp_client/server_sse.rb', line 17

def base_url
  @base_url
end

#capabilitiesObject (readonly)

Returns the value of attribute capabilities.



17
18
19
# File 'lib/mcp_client/server_sse.rb', line 17

def capabilities
  @capabilities
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



17
18
19
# File 'lib/mcp_client/server_sse.rb', line 17

def server_info
  @server_info
end

#toolsObject (readonly)

Returns the value of attribute tools.



17
18
19
# File 'lib/mcp_client/server_sse.rb', line 17

def tools
  @tools
end

Instance Method Details

#call_tool(tool_name, parameters) ⇒ Object

Call a tool with the given parameters

Raises:



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/mcp_client/server_sse.rb', line 116

def call_tool(tool_name, parameters)
  ensure_initialized

  begin
    request_id = @mutex.synchronize { @request_id += 1 }

    json_rpc_request = {
      jsonrpc: '2.0',
      id: request_id,
      method: 'tools/call',
      params: {
        name: tool_name,
        arguments: parameters
      }
    }

    send_jsonrpc_request(json_rpc_request)
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error calling tool '#{tool_name}': #{e.message}"
  end
end

#call_tool_streaming(tool_name, parameters) ⇒ Enumerator

Stream tool call fallback for SSE transport (yields single result)



72
73
74
75
76
# File 'lib/mcp_client/server_sse.rb', line 72

def call_tool_streaming(tool_name, parameters)
  Enumerator.new do |yielder|
    yielder << call_tool(tool_name, parameters)
  end
end

#cleanupObject

Clean up the server connection Properly closes HTTP connections and clears cached tools



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/mcp_client/server_sse.rb', line 176

def cleanup
  @mutex.synchronize do
    begin
      @sse_thread&.kill
    rescue StandardError
      nil
    end
    @sse_thread = nil

    begin
      @activity_timer_thread&.kill
    rescue StandardError
      nil
    end
    @activity_timer_thread = nil

    if @http_client
      @http_client.finish if @http_client.started?
      @http_client = nil
    end

    @tools = nil
    @connection_established = false
    @sse_connected = false
    # Don't clear auth error as we need it for reporting the correct error
  end
end

#connectBoolean

Connect to the MCP server over HTTP/HTTPS with SSE

Raises:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/mcp_client/server_sse.rb', line 146

def connect
  return true if @mutex.synchronize { @connection_established }

  begin
    start_sse_thread
    effective_timeout = [@read_timeout || 30, 30].min
    wait_for_connection(timeout: effective_timeout)
    start_activity_monitor
    true
  rescue MCPClient::Errors::ConnectionError => e
    cleanup
    # Check for stored auth error first, as it's more specific
    auth_error = @mutex.synchronize { @auth_error }
    raise MCPClient::Errors::ConnectionError, auth_error if auth_error

    raise MCPClient::Errors::ConnectionError, e.message if e.message.include?('Authorization failed')

    raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
  rescue StandardError => e
    cleanup
    # Check for stored auth error
    auth_error = @mutex.synchronize { @auth_error }
    raise MCPClient::Errors::ConnectionError, auth_error if auth_error

    raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
  end
end

#list_toolsArray<MCPClient::Tool>

List all tools available from the MCP server

Raises:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/mcp_client/server_sse.rb', line 83

def list_tools
  @mutex.synchronize do
    return @tools if @tools
  end

  ensure_initialized

  begin
    tools_data = request_tools_list
    @mutex.synchronize do
      @tools = tools_data.map do |tool_data|
        MCPClient::Tool.from_json(tool_data)
      end
    end

    @mutex.synchronize { @tools }
  rescue MCPClient::Errors::TransportError
    # Re-raise TransportError directly
    raise
  rescue JSON::ParserError => e
    raise MCPClient::Errors::TransportError, "Invalid JSON response from server: #{e.message}"
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error listing tools: #{e.message}"
  end
end

#pingHash

Ping the server to keep the connection alive



252
253
254
# File 'lib/mcp_client/server_sse.rb', line 252

def ping
  rpc_request('ping')
end

#rpc_notify(method, params = {}) ⇒ void

This method returns an undefined value.

Send a JSON-RPC notification (no response expected)



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/mcp_client/server_sse.rb', line 221

def rpc_notify(method, params = {})
  ensure_initialized
  uri = URI.parse(@base_url)
  base = "#{uri.scheme}://#{uri.host}:#{uri.port}"
  rpc_ep = @mutex.synchronize { @rpc_endpoint }
  @rpc_conn ||= Faraday.new(url: base) do |f|
    f.request :retry, max: @max_retries, interval: @retry_backoff, backoff_factor: 2
    f.options.open_timeout = @read_timeout
    f.options.timeout = @read_timeout
    f.adapter Faraday.default_adapter
  end
  response = @rpc_conn.post(rpc_ep) do |req|
    req.headers['Content-Type'] = 'application/json'
    req.headers['Accept'] = 'application/json'
    (@headers.dup.tap do |h|
      h.delete('Accept')
      h.delete('Cache-Control')
    end).each do |k, v|
      req.headers[k] = v
    end
    req.body = { jsonrpc: '2.0', method: method, params: params }.to_json
  end
  unless response.success?
    raise MCPClient::Errors::ServerError, "Notification failed: #{response.status} #{response.reason_phrase}"
  end
rescue StandardError => e
  raise MCPClient::Errors::TransportError, "Failed to send notification: #{e.message}"
end

#rpc_request(method, params = {}) ⇒ Object

Generic JSON-RPC request: send method with params and return result



208
209
210
211
212
213
214
215
# File 'lib/mcp_client/server_sse.rb', line 208

def rpc_request(method, params = {})
  ensure_initialized
  with_retry do
    request_id = @mutex.synchronize { @request_id += 1 }
    request = { jsonrpc: '2.0', id: request_id, method: method, params: params }
    send_jsonrpc_request(request)
  end
end