Module: MCPClient::ServerSSE::SseParser
- Included in:
- MCPClient::ServerSSE
- Defined in:
- lib/mcp_client/server_sse/sse_parser.rb
Overview
Wire-level SSE parsing & dispatch ===
Instance Method Summary collapse
-
#handle_endpoint_event(data) ⇒ Object
Handle the special “endpoint” control frame (for SSE handshake).
-
#handle_message_event(event) ⇒ Object
Handle a “message” SSE event (payload is JSON-RPC over SSE).
-
#parse_and_handle_sse_event(event_data) ⇒ Object
Parse and handle a raw SSE event payload.
-
#parse_sse_event(event_data) ⇒ Hash?
Parse a raw SSE chunk into its :event, :data, :id fields.
-
#process_error_in_message(data) ⇒ Boolean
Process a JSON-RPC error() in the SSE stream.
-
#process_notification?(data) ⇒ Boolean
Process a JSON-RPC notification (no id => notification).
-
#process_response?(data) ⇒ Boolean
Process a JSON-RPC response (id => response).
Instance Method Details
#handle_endpoint_event(data) ⇒ Object
Handle the special “endpoint” control frame (for SSE handshake)
121 122 123 124 125 126 127 128 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 121 def handle_endpoint_event(data) @mutex.synchronize do @rpc_endpoint = data @sse_connected = true @connection_established = true @connection_cv.broadcast end end |
#handle_message_event(event) ⇒ Object
Handle a “message” SSE event (payload is JSON-RPC over SSE)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 27 def (event) return if event[:data].empty? begin data = JSON.parse(event[:data]) return if (data) return if process_notification?(data) process_response?(data) rescue MCPClient::Errors::ConnectionError raise rescue JSON::ParserError => e @logger.warn("Failed to parse JSON from event data: #{e.}") rescue StandardError => e @logger.error("Error processing SSE event: #{e.}") end end |
#parse_and_handle_sse_event(event_data) ⇒ Object
Parse and handle a raw SSE event payload.
11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 11 def parse_and_handle_sse_event(event_data) event = parse_sse_event(event_data) return if event.nil? case event[:event] when 'endpoint' handle_endpoint_event(event[:data]) when 'ping' # no-op when 'message' (event) end end |
#parse_sse_event(event_data) ⇒ Hash?
Parse a raw SSE chunk into its :event, :data, :id fields
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 95 def parse_sse_event(event_data) event = { event: 'message', data: '', id: nil } data_lines = [] has_content = false event_data.each_line do |line| line = line.chomp next if line.empty? # blank line next if line.start_with?(':') # SSE comment has_content = true if line.start_with?('event:') event[:event] = line[6..].strip elsif line.start_with?('data:') data_lines << line[5..].strip elsif line.start_with?('id:') event[:id] = line[3..].strip end end event[:data] = data_lines.join("\n") has_content ? event : nil end |
#process_error_in_message(data) ⇒ Boolean
Process a JSON-RPC error() in the SSE stream.
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 49 def (data) return unless data['error'] = data['error']['message'] || 'Unknown server error' error_code = data['error']['code'] () if (, error_code) @logger.error("Server error: #{}") true end |
#process_notification?(data) ⇒ Boolean
Process a JSON-RPC notification (no id => notification)
64 65 66 67 68 69 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 64 def process_notification?(data) return false unless data['method'] && !data.key?('id') @notification_callback&.call(data['method'], data['params']) true end |
#process_response?(data) ⇒ Boolean
Process a JSON-RPC response (id => response)
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/mcp_client/server_sse/sse_parser.rb', line 74 def process_response?(data) return false unless data['id'] @mutex.synchronize do @tools_data = data['result']['tools'] if data['result'] && data['result']['tools'] @sse_results[data['id']] = if data['error'] { 'isError' => true, 'content' => [{ 'type' => 'text', 'text' => data['error'].to_json }] } else data['result'] end end true end |