Class: MCPClient::ServerSSE

Inherits:
ServerBase show all
Includes:
JsonRpcTransport, ReconnectMonitor, SseParser
Defined in:
lib/mcp_client/server_sse.rb,
lib/mcp_client/server_sse/sse_parser.rb,
lib/mcp_client/server_sse/reconnect_monitor.rb,
lib/mcp_client/server_sse/json_rpc_transport.rb

Overview

Implementation of MCP server that communicates via Server-Sent Events (SSE) Useful for communicating with remote MCP servers over HTTP

Defined Under Namespace

Modules: JsonRpcTransport, ReconnectMonitor, SseParser

Constant Summary collapse

CLOSE_AFTER_PING_RATIO =

Ratio of close_after timeout to ping interval

2.5
DEFAULT_MAX_PING_FAILURES =

Default values for connection monitoring

3
DEFAULT_MAX_RECONNECT_ATTEMPTS =
5
BASE_RECONNECT_DELAY =

Reconnection backoff constants

0.5
MAX_RECONNECT_DELAY =
30
JITTER_FACTOR =
0.25

Instance Attribute Summary collapse

Attributes inherited from ServerBase

#name

Instance Method Summary collapse

Methods included from ReconnectMonitor

#activity_monitor_loop, #attempt_ping, #attempt_reconnection, #connection_active?, #handle_ping_failure, #handle_sse_auth_error, #record_activity, #reset_connection_state, #setup_sse_connection, #start_activity_monitor, #wait_for_connection

Methods included from JsonRpcTransport

#rpc_notify, #rpc_request

Methods included from JsonRpcCommon

#build_jsonrpc_notification, #build_jsonrpc_request, #initialization_params, #ping, #process_jsonrpc_response, #with_retry

Methods included from SseParser

#handle_endpoint_event, #handle_message_event, #parse_and_handle_sse_event, #parse_sse_event, #process_error_in_message, #process_notification?, #process_response?

Methods inherited from ServerBase

#on_notification, #ping, #rpc_notify, #rpc_request

Constructor Details

#initialize(base_url:, headers: {}, read_timeout: 30, ping: 10, retries: 0, retry_backoff: 1, name: nil, logger: nil) ⇒ ServerSSE

Returns a new instance of ServerSSE.

Parameters:

  • base_url (String)

    The base URL of the MCP server

  • headers (Hash) (defaults to: {})

    Additional headers to include in requests

  • read_timeout (Integer) (defaults to: 30)

    Read timeout in seconds (default: 30)

  • ping (Integer) (defaults to: 10)

    Time in seconds after which to send ping if no activity (default: 10)

  • retries (Integer) (defaults to: 0)

    number of retry attempts on transient errors

  • retry_backoff (Numeric) (defaults to: 1)

    base delay in seconds for exponential backoff

  • name (String, nil) (defaults to: nil)

    optional name for this server

  • logger (Logger, nil) (defaults to: nil)

    optional logger



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/mcp_client/server_sse.rb', line 57

def initialize(base_url:, headers: {}, read_timeout: 30, ping: 10,
               retries: 0, retry_backoff: 1, name: nil, logger: nil)
  super(name: name)
  initialize_logger(logger)
  @max_retries = retries
  @retry_backoff = retry_backoff
  # Normalize base_url: preserve trailing slash if explicitly provided for SSE endpoints
  @base_url = base_url
  @headers = headers.merge({
                             'Accept' => 'text/event-stream',
                             'Cache-Control' => 'no-cache',
                             'Connection' => 'keep-alive'
                           })
  # HTTP client is managed via Faraday
  @tools = nil
  @read_timeout = read_timeout
  @ping_interval = ping
  # Set close_after to a multiple of the ping interval
  @close_after = (ping * CLOSE_AFTER_PING_RATIO).to_i

  # SSE-provided JSON-RPC endpoint path for POST requests
  @rpc_endpoint = nil
  @tools_data = nil
  @request_id = 0
  @sse_results = {}
  @mutex = Monitor.new
  @buffer = ''
  @sse_connected = false
  @connection_established = false
  @connection_cv = @mutex.new_cond
  @initialized = false
  @auth_error = nil
  # Whether to use SSE transport; may disable if handshake fails
  @use_sse = true

  # Time of last activity
  @last_activity_time = Time.now
  @activity_timer_thread = nil
end

Instance Attribute Details

#base_urlString (readonly)

Returns The base URL of the MCP server.

Returns:

  • (String)

    The base URL of the MCP server



39
40
41
# File 'lib/mcp_client/server_sse.rb', line 39

def base_url
  @base_url
end

#capabilitiesHash? (readonly)

Server capabilities from initialize response

Returns:

  • (Hash, nil)

    Server capabilities



47
48
49
# File 'lib/mcp_client/server_sse.rb', line 47

def capabilities
  @capabilities
end

#server_infoHash? (readonly)

Server information from initialize response

Returns:

  • (Hash, nil)

    Server information



43
44
45
# File 'lib/mcp_client/server_sse.rb', line 43

def server_info
  @server_info
end

#toolsObject (readonly)

Returns the value of attribute tools.



39
# File 'lib/mcp_client/server_sse.rb', line 39

attr_reader :base_url, :tools

Instance Method Details

#call_tool(tool_name, parameters) ⇒ Object

Call a tool with the given parameters Call a tool with the given parameters

Parameters:

  • tool_name (String)

    the name of the tool to call

  • parameters (Hash)

    the parameters to pass to the tool

  • tool_name (String)

    the name of the tool to call

  • parameters (Hash)

    the parameters to pass to the tool

Returns:

  • (Object)

    the result of the tool invocation

  • (Object)

    the result of the tool invocation (with string keys for backward compatibility)

Raises:



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/mcp_client/server_sse.rb', line 148

def call_tool(tool_name, parameters)
  rpc_request('tools/call', {
                name: tool_name,
                arguments: parameters
              })
rescue MCPClient::Errors::ConnectionError, MCPClient::Errors::TransportError
  # Re-raise connection/transport errors directly to match test expectations
  raise
rescue StandardError => e
  # For all other errors, wrap in ToolCallError
  raise MCPClient::Errors::ToolCallError, "Error calling tool '#{tool_name}': #{e.message}"
end

#call_tool_streaming(tool_name, parameters) ⇒ Enumerator

Stream tool call fallback for SSE transport (yields single result)

Parameters:

  • tool_name (String)
  • parameters (Hash)

Returns:

  • (Enumerator)


101
102
103
104
105
# File 'lib/mcp_client/server_sse.rb', line 101

def call_tool_streaming(tool_name, parameters)
  Enumerator.new do |yielder|
    yielder << call_tool(tool_name, parameters)
  end
end

#cleanupObject

Note:

This method preserves ping failure and reconnection metrics between reconnection attempts, allowing the client to track failures across multiple connection attempts. This is essential for proper reconnection logic and exponential backoff.

Clean up the server connection Properly closes HTTP connections and clears cached state



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/mcp_client/server_sse.rb', line 201

def cleanup
  @mutex.synchronize do
    # Set flags first before killing threads to prevent race conditions
    # where threads might check flags after they're set but before they're killed
    @connection_established = false
    @sse_connected = false
    @initialized = false # Reset initialization state for reconnection

    # Log cleanup for debugging
    @logger.debug('Cleaning up SSE connection')

    # Store threads locally to avoid race conditions
    sse_thread = @sse_thread
    activity_thread = @activity_timer_thread

    # Clear thread references first
    @sse_thread = nil
    @activity_timer_thread = nil

    # Kill threads outside the critical section
    begin
      sse_thread&.kill
    rescue StandardError => e
      @logger.debug("Error killing SSE thread: #{e.message}")
    end

    begin
      activity_thread&.kill
    rescue StandardError => e
      @logger.debug("Error killing activity thread: #{e.message}")
    end

    if @http_client
      @http_client.finish if @http_client.started?
      @http_client = nil
    end

    # Close Faraday connections if they exist
    @rpc_conn = nil
    @sse_conn = nil

    @tools = nil
    # Don't clear auth error as we need it for reporting the correct error
    # Don't reset @consecutive_ping_failures or @reconnect_attempts as they're tracked across reconnections
  end
end

#connectBoolean

Connect to the MCP server over HTTP/HTTPS with SSE

Returns:

  • (Boolean)

    true if connection was successful

Raises:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/mcp_client/server_sse.rb', line 164

def connect
  return true if @mutex.synchronize { @connection_established }

  # Check for pre-existing auth error (needed for tests)
  pre_existing_auth_error = @mutex.synchronize { @auth_error }

  begin
    # Don't reset auth error if it's pre-existing
    @mutex.synchronize { @auth_error = nil } unless pre_existing_auth_error

    start_sse_thread
    effective_timeout = [@read_timeout || 30, 30].min
    wait_for_connection(timeout: effective_timeout)
    start_activity_monitor
    true
  rescue MCPClient::Errors::ConnectionError => e
    cleanup
    # Simply pass through any ConnectionError without wrapping it again
    # This prevents duplicate error messages in the stack
    raise e
  rescue StandardError => e
    cleanup
    # Check for stored auth error first as it's more specific
    auth_error = @mutex.synchronize { @auth_error }
    raise MCPClient::Errors::ConnectionError, auth_error if auth_error

    raise MCPClient::Errors::ConnectionError, "Failed to connect to MCP server at #{@base_url}: #{e.message}"
  end
end

#list_toolsArray<MCPClient::Tool>

List all tools available from the MCP server

Returns:

Raises:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/mcp_client/server_sse.rb', line 112

def list_tools
  @mutex.synchronize do
    return @tools if @tools
  end

  begin
    ensure_initialized

    tools_data = request_tools_list
    @mutex.synchronize do
      @tools = tools_data.map do |tool_data|
        MCPClient::Tool.from_json(tool_data, server: self)
      end
    end

    @mutex.synchronize { @tools }
  rescue MCPClient::Errors::ConnectionError, MCPClient::Errors::TransportError, MCPClient::Errors::ServerError
    # Re-raise these errors directly
    raise
  rescue StandardError => e
    raise MCPClient::Errors::ToolCallError, "Error listing tools: #{e.message}"
  end
end