Module: MCPClient::ServerSSE::ReconnectMonitor

Included in:
MCPClient::ServerSSE
Defined in:
lib/mcp_client/server_sse/reconnect_monitor.rb

Overview

Extracted module for back-off, ping, and reconnection logic

Instance Method Summary collapse

Instance Method Details

#activity_monitor_loopvoid

This method returns an undefined value.

Main loop for the activity monitor thread



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 36

def activity_monitor_loop
  loop do
    sleep 1

    unless connection_active?
      @logger.debug('Activity monitor exiting: connection no longer active')
      return
    end

    @mutex.synchronize do
      @consecutive_ping_failures ||= 0
      @reconnect_attempts ||= 0
      @max_ping_failures ||= DEFAULT_MAX_PING_FAILURES
      @max_reconnect_attempts ||= DEFAULT_MAX_RECONNECT_ATTEMPTS
    end

    return unless connection_active?

    time_since_activity = Time.now - @last_activity_time

    if @close_after && time_since_activity >= @close_after
      @logger.info("Closing connection due to inactivity (#{time_since_activity.round(1)}s)")
      cleanup
      return
    end

    next unless @ping_interval && time_since_activity >= @ping_interval
    return unless connection_active?

    if @consecutive_ping_failures >= @max_ping_failures
      attempt_reconnection
    else
      attempt_ping
    end
  end
end

#attempt_pingvoid

This method returns an undefined value.

Attempt to ping the server to check if connection is still alive



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 113

def attempt_ping
  unless connection_active?
    @logger.debug('Skipping ping - connection not active')
    return
  end

  time_since = Time.now - @last_activity_time
  @logger.debug("Sending ping after #{time_since.round(1)}s of inactivity")

  begin
    ping
    @mutex.synchronize do
      @last_activity_time = Time.now
      @consecutive_ping_failures = 0
    end
  rescue StandardError => e
    unless connection_active?
      @logger.debug("Ignoring ping failure - connection already closed: #{e.message}")
      return
    end
    handle_ping_failure(e)
  end
end

#attempt_reconnectionvoid

This method returns an undefined value.

Attempt to reconnect with exponential backoff



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 76

def attempt_reconnection
  if @reconnect_attempts < @max_reconnect_attempts
    begin
      base_delay = BASE_RECONNECT_DELAY * (2**@reconnect_attempts)
      jitter = rand * JITTER_FACTOR * base_delay
      backoff_delay = [base_delay + jitter, MAX_RECONNECT_DELAY].min

      reconnect_msg = "Attempting to reconnect (attempt #{@reconnect_attempts + 1}/#{@max_reconnect_attempts}) "
      reconnect_msg += "after #{@consecutive_ping_failures} consecutive ping failures. "
      reconnect_msg += "Waiting #{backoff_delay.round(2)}s before reconnect..."
      @logger.warn(reconnect_msg)
      sleep(backoff_delay)

      cleanup

      connect
      @logger.info('Successfully reconnected after ping failures')

      @mutex.synchronize do
        @consecutive_ping_failures = 0
        # Reset attempt counter after a successful reconnect
        @reconnect_attempts = 0
        @last_activity_time = Time.now
      end
    rescue StandardError => e
      @logger.error("Failed to reconnect after ping failures: #{e.message}")
      @mutex.synchronize { @reconnect_attempts += 1 }
    end
  else
    @logger.error("Exceeded maximum reconnection attempts (#{@max_reconnect_attempts}). Closing connection.")
    cleanup
  end
end

#connection_active?Boolean

Check if the connection is currently active



29
30
31
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 29

def connection_active?
  @mutex.synchronize { @connection_established && @sse_connected }
end

#handle_ping_failure(error) ⇒ void

This method returns an undefined value.

Handle ping failures by incrementing a counter and logging



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 141

def handle_ping_failure(error)
  @mutex.synchronize { @consecutive_ping_failures += 1 }
  consecutive_failures = @consecutive_ping_failures

  if consecutive_failures == 1
    @logger.error("Error sending ping: #{error.message}")
  else
    error_msg = error.message.split("\n").first
    @logger.warn("Ping failed (#{consecutive_failures}/#{@max_ping_failures}): #{error_msg}")
  end
end

#handle_sse_auth_error(error) ⇒ void

This method returns an undefined value.

Handle authentication errors from SSE



206
207
208
209
210
211
212
213
214
215
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 206

def handle_sse_auth_error(error)
  error_message = "Authorization failed: HTTP #{error.response[:status]}"
  @logger.error(error_message)

  @mutex.synchronize do
    @auth_error = error_message
    @connection_established = false
    @connection_cv.broadcast
  end
end

#record_activityvoid

This method returns an undefined value.

Record activity to prevent unnecessary pings



155
156
157
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 155

def record_activity
  @mutex.synchronize { @last_activity_time = Time.now }
end

#reset_connection_statevoid

This method returns an undefined value.

Reset the connection state



220
221
222
223
224
225
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 220

def reset_connection_state
  @mutex.synchronize do
    @connection_established = false
    @connection_cv.broadcast
  end
end

#setup_sse_connection(uri) ⇒ Faraday::Connection

Setup the SSE connection with Faraday



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 187

def setup_sse_connection(uri)
  sse_base = "#{uri.scheme}://#{uri.host}:#{uri.port}"

  @sse_conn ||= Faraday.new(url: sse_base) do |f|
    f.options.open_timeout = 10
    f.options.timeout = nil
    f.request :retry, max: @max_retries, interval: @retry_backoff, backoff_factor: 2
    f.response :follow_redirects, limit: 3
    f.adapter Faraday.default_adapter
  end

  @sse_conn.builder.use Faraday::Response::RaiseError
  @sse_conn
end

#start_activity_monitorvoid

This method returns an undefined value.

Start an activity monitor thread to maintain the connection



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 9

def start_activity_monitor
  return if @activity_timer_thread&.alive?

  @mutex.synchronize do
    @last_activity_time = Time.now
    @consecutive_ping_failures = 0
    @max_ping_failures = DEFAULT_MAX_PING_FAILURES
    @reconnect_attempts = 0
    @max_reconnect_attempts = DEFAULT_MAX_RECONNECT_ATTEMPTS
  end

  @activity_timer_thread = Thread.new do
    activity_monitor_loop
  rescue StandardError => e
    @logger.error("Activity monitor error: #{e.message}")
  end
end

#wait_for_connection(timeout:) ⇒ void

This method returns an undefined value.

Wait for the connection to be established

Raises:



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/mcp_client/server_sse/reconnect_monitor.rb', line 163

def wait_for_connection(timeout:)
  @mutex.synchronize do
    deadline = Time.now + timeout

    until @connection_established
      remaining = [1, deadline - Time.now].min
      break if remaining <= 0 || @connection_cv.wait(remaining) { @connection_established }
    end

    raise MCPClient::Errors::ConnectionError, @auth_error if @auth_error

    unless @connection_established
      cleanup
      error_msg = "Failed to connect to MCP server at #{@base_url}"
      error_msg += ': Timed out waiting for SSE connection to be established'
      raise MCPClient::Errors::ConnectionError, error_msg
    end
  end
end