Module: MCPClient::ServerSSE::SseParser

Included in:
MCPClient::ServerSSE
Defined in:
lib/mcp_client/server_sse/sse_parser.rb

Overview

Wire-level SSE parsing & dispatch ===

Instance Method Summary collapse

Instance Method Details

#handle_endpoint_event(data) ⇒ Object

Handle the special “endpoint” control frame (for SSE handshake)

Parameters:

  • data (String)

    the raw endpoint payload



121
122
123
124
125
126
127
128
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 121

def handle_endpoint_event(data)
  @mutex.synchronize do
    @rpc_endpoint = data
    @sse_connected = true
    @connection_established = true
    @connection_cv.broadcast
  end
end

#handle_message_event(event) ⇒ Object

Handle a “message” SSE event (payload is JSON-RPC over SSE)

Parameters:

  • event (Hash)

    the parsed SSE event (with :data, :id, etc)



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 27

def handle_message_event(event)
  return if event[:data].empty?

  begin
    data = JSON.parse(event[:data])

    return if process_error_in_message(data)
    return if process_notification?(data)

    process_response?(data)
  rescue MCPClient::Errors::ConnectionError
    raise
  rescue JSON::ParserError => e
    @logger.warn("Failed to parse JSON from event data: #{e.message}")
  rescue StandardError => e
    @logger.error("Error processing SSE event: #{e.message}")
  end
end

#parse_and_handle_sse_event(event_data) ⇒ Object

Parse and handle a raw SSE event payload.

Parameters:

  • event_data (String)

    the raw event chunk



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 11

def parse_and_handle_sse_event(event_data)
  event = parse_sse_event(event_data)
  return if event.nil?

  case event[:event]
  when 'endpoint'
    handle_endpoint_event(event[:data])
  when 'ping'
    # no-op
  when 'message'
    handle_message_event(event)
  end
end

#parse_sse_event(event_data) ⇒ Hash?

Parse a raw SSE chunk into its :event, :data, :id fields

Parameters:

  • event_data (String)

    the raw SSE block

Returns:

  • (Hash, nil)

    parsed fields or nil if it was pure comment/blank



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 95

def parse_sse_event(event_data)
  event       = { event: 'message', data: '', id: nil }
  data_lines  = []
  has_content = false

  event_data.each_line do |line|
    line = line.chomp
    next if line.empty? # blank line
    next if line.start_with?(':') # SSE comment

    has_content = true
    if line.start_with?('event:')
      event[:event] = line[6..].strip
    elsif line.start_with?('data:')
      data_lines << line[5..].strip
    elsif line.start_with?('id:')
      event[:id] = line[3..].strip
    end
  end

  event[:data] = data_lines.join("\n")
  has_content ? event : nil
end

#process_error_in_message(data) ⇒ Boolean

Process a JSON-RPC error() in the SSE stream.

Parameters:

  • data (Hash)

    the parsed JSON payload

Returns:

  • (Boolean)

    true if we saw & handled an error



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 49

def process_error_in_message(data)
  return unless data['error']

  error_message = data['error']['message'] || 'Unknown server error'
  error_code    = data['error']['code']

  handle_sse_auth_error_message(error_message) if authorization_error?(error_message, error_code)

  @logger.error("Server error: #{error_message}")
  true
end

#process_notification?(data) ⇒ Boolean

Process a JSON-RPC notification (no id => notification)

Parameters:

  • data (Hash)

    the parsed JSON payload

Returns:

  • (Boolean)

    true if we saw & handled a notification



64
65
66
67
68
69
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 64

def process_notification?(data)
  return false unless data['method'] && !data.key?('id')

  @notification_callback&.call(data['method'], data['params'])
  true
end

#process_response?(data) ⇒ Boolean

Process a JSON-RPC response (id => response)

Parameters:

  • data (Hash)

    the parsed JSON payload

Returns:

  • (Boolean)

    true if we saw & handled a response



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 74

def process_response?(data)
  return false unless data['id']

  @mutex.synchronize do
    @tools_data = data['result']['tools'] if data['result'] && data['result']['tools']

    @sse_results[data['id']] =
      if data['error']
        { 'isError' => true,
          'content' => [{ 'type' => 'text', 'text' => data['error'].to_json }] }
      else
        data['result']
      end
  end

  true
end