Class: VectorMCP::Transport::Stdio

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/stdio.rb

Overview

Implements the Model Context Protocol transport over standard input/output (stdio). This transport reads JSON-RPC messages line-by-line from ‘$stdin` and writes responses/notifications line-by-line to `$stdout`.

It is suitable for inter-process communication on the same machine where a parent process spawns an MCP server and communicates with it via its stdio streams.

Constant Summary collapse

DEFAULT_REQUEST_TIMEOUT =

Timeout for waiting for a response to a server-initiated request (in seconds)

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Stdio

Initializes a new Stdio transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages.

  • options (Hash) (defaults to: {})

    Optional configuration options.

Options Hash (options):

  • :enable_session_manager (Boolean) — default: false

    Whether to enable the unified session manager.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/vector_mcp/transport/stdio.rb', line 35

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  @session_manager = options[:enable_session_manager] ? StdioSessionManager.new(self) : nil
  @input_mutex = Mutex.new
  @output_mutex = Mutex.new
  @running = false
  @input_thread = nil
  @shutdown_requested = false
  @outgoing_request_responses = {} # To store responses for server-initiated requests
  @outgoing_request_conditions = {} # ConditionVariables for server-initiated requests
  @mutex = Mutex.new # To synchronize access to shared response data
  @request_id_generator = Enumerator.new do |y|
    i = 0
    loop { y << "vecmcp_stdio_#{i += 1}_#{SecureRandom.hex(4)}" }
  end
end

Instance Attribute Details

#loggerLogger (readonly)

Returns The logger instance, shared with the server.

Returns:

  • (Logger)

    The logger instance, shared with the server.



23
24
25
# File 'lib/vector_mcp/transport/stdio.rb', line 23

def logger
  @logger
end

#serverVectorMCP::Server (readonly)

Returns The server instance this transport is bound to.

Returns:



21
22
23
# File 'lib/vector_mcp/transport/stdio.rb', line 21

def server
  @server
end

#session_managerStdioSessionManager (readonly)

Returns The session manager for this transport.

Returns:



25
26
27
# File 'lib/vector_mcp/transport/stdio.rb', line 25

def session_manager
  @session_manager
end

Instance Method Details

#broadcast_notification(method, params = nil) ⇒ Integer

Broadcasts a JSON-RPC notification message to all sessions. For stdio transport, this behaves the same as send_notification since there’s only one session.

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Integer)

    Number of sessions the notification was sent to (always 1 for stdio).



150
151
152
153
# File 'lib/vector_mcp/transport/stdio.rb', line 150

def broadcast_notification(method, params = nil)
  send_notification(method, params)
  1
end

#notification_sent_to_session?(_session_id, method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification message to a specific session. For stdio transport, this behaves the same as send_notification since there’s only one session.

Parameters:

  • _session_id (String)

    The session ID (ignored for stdio transport).

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the notification was sent successfully.



139
140
141
142
# File 'lib/vector_mcp/transport/stdio.rb', line 139

def notification_sent_to_session?(_session_id, method, params = nil)
  send_notification(method, params)
  true
end

#runvoid

This method returns an undefined value.

Starts the stdio transport, listening for input and processing messages. This method will block until the input stream is closed or an interrupt is received.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/vector_mcp/transport/stdio.rb', line 57

def run
  session = create_session
  logger.info("Starting stdio transport")
  @running = true

  begin
    launch_input_thread(session)
    @input_thread.join
  rescue Interrupt
    logger.info("Interrupted. Shutting down...")
  ensure
    shutdown_transport
  end
end

#send_error(id, code, message, data = nil) ⇒ void

This method returns an undefined value.

Sends a JSON-RPC error response message.

Parameters:

  • id (String, Integer, nil)

    The ID of the request that caused the error.

  • code (Integer)

    The JSON-RPC error code.

  • message (String)

    A short description of the error.

  • data (Object, nil) (defaults to: nil)

    Additional error data (optional).



93
94
95
96
97
98
99
100
101
102
# File 'lib/vector_mcp/transport/stdio.rb', line 93

def send_error(id, code, message, data = nil)
  error_obj = { code: code, message: message }
  error_obj[:data] = data if data
  response = {
    jsonrpc: "2.0",
    id: id,
    error: error_obj
  }
  write_message(response)
end

#send_notification(method, params = nil) ⇒ void

This method returns an undefined value.

Sends a JSON-RPC notification message (a request without an ID).

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).



109
110
111
112
113
114
115
116
# File 'lib/vector_mcp/transport/stdio.rb', line 109

def send_notification(method, params = nil)
  notification = {
    jsonrpc: "2.0",
    method: method
  }
  notification[:params] = params if params
  write_message(notification)
end

#send_notification_to_session(_session_id, method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification message to a specific session. For stdio transport, this behaves the same as send_notification since there’s only one session.

rubocop:disable Naming/PredicateMethod

Parameters:

  • _session_id (String)

    The session ID (ignored for stdio transport).

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the notification was sent successfully.



126
127
128
129
# File 'lib/vector_mcp/transport/stdio.rb', line 126

def send_notification_to_session(_session_id, method, params = nil)
  send_notification(method, params)
  true
end

#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object

Sends a server-initiated JSON-RPC request to the client and waits for a response. This is a blocking call.

Parameters:

  • method (String)

    The request method name.

  • params (Hash, Array, nil) (defaults to: nil)

    The request parameters.

  • timeout (Numeric) (defaults to: DEFAULT_REQUEST_TIMEOUT)

    How long to wait for a response, in seconds.

Returns:

  • (Object)

    The result part of the client’s response.

Raises:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/vector_mcp/transport/stdio.rb', line 164

def send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT)
  raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty?

  request_id = @request_id_generator.next
  request_payload = { jsonrpc: "2.0", id: request_id, method: method }
  request_payload[:params] = params if params

  setup_request_tracking(request_id)
  # Sending request to client
  write_message(request_payload)

  response = wait_for_response(request_id, method, timeout)
  process_response(response, request_id, method)
end

#send_response(id, result) ⇒ void

This method returns an undefined value.

Sends a JSON-RPC response message for a given request ID.

Parameters:

  • id (String, Integer, nil)

    The ID of the request being responded to.

  • result (Object)

    The result data for the successful request.



77
78
79
80
81
82
83
84
# File 'lib/vector_mcp/transport/stdio.rb', line 77

def send_response(id, result)
  response = {
    jsonrpc: "2.0",
    id: id,
    result: result
  }
  write_message(response)
end

#shutdownvoid

This method returns an undefined value.

Initiates an immediate shutdown of the transport. Sets the running flag to false and attempts to kill the input reading thread.



183
184
185
186
187
# File 'lib/vector_mcp/transport/stdio.rb', line 183

def shutdown
  logger.info("Shutdown requested for stdio transport.")
  @running = false
  @input_thread&.kill if @input_thread&.alive?
end