Class: ActionMCP::ApplicationController

Inherits:
ActionController::API
  • Object
show all
Includes:
ActionController::Instrumentation, ActionController::Live, JSONRPC_Rails::ControllerHelpers
Defined in:
app/controllers/action_mcp/application_controller.rb

Overview

Implements the MCP endpoints according to the 2025-03-26 specification. Supports GET for server-initiated SSE streams, POST for client messages (responding with JSON or SSE), and optionally DELETE for session termination.

Constant Summary collapse

MCP_SESSION_ID_HEADER =
"Mcp-Session-Id"

Instance Method Summary collapse

Instance Method Details

#createObject

Handles POST requests containing client JSON-RPC messages according to 2025-03-26 spec. <rails-lens:routes:begin> ROUTE: /, name: mcp_post, via: POST <rails-lens:routes:end>



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'app/controllers/action_mcp/application_controller.rb', line 166

def create
  unless post_accept_headers_valid?
    id = extract_jsonrpc_id_from_request
    return render_not_acceptable(post_accept_headers_error_message, id)
  end

  # Reject JSON-RPC batch requests as per MCP 2025-06-18 spec
  return render_bad_request("JSON-RPC batch requests are not supported", nil) if jsonrpc_params_batch?

  is_initialize_request = check_if_initialize_request(jsonrpc_params)
  session_initially_missing = extract_session_id.nil?
  session = mcp_session

  # Validate MCP-Protocol-Version header for non-initialize requests
  return unless validate_protocol_version_header

  unless initialization_related_request?(jsonrpc_params)
    if session_initially_missing
      id = jsonrpc_params.respond_to?(:id) ? jsonrpc_params.id : nil
      return render_bad_request("Mcp-Session-Id header is required for this request.", id)
    elsif session.nil? || session.new_record?
      id = jsonrpc_params.respond_to?(:id) ? jsonrpc_params.id : nil
      return render_not_found("Session not found.", id)
    elsif session.status == "closed"
      id = jsonrpc_params.respond_to?(:id) ? jsonrpc_params.id : nil
      return render_not_found("Session has been terminated.", id)
    end
  end

  if session.new_record?
    session.save!
    response.headers[MCP_SESSION_ID_HEADER] = session.id
  end

  # Authenticate the request via gateway (skipped for initialization-related requests)
  if initialization_related_request?(jsonrpc_params)
    # Skipping authentication for initialization request: #{jsonrpc_params.method}
  else
    authenticate_gateway!
    return if performed?
  end

  # Use return mode for the transport handler when we need to capture responses
  transport_handler = Server::TransportHandler.new(session, messaging_mode: :return)
  json_rpc_handler = Server::JsonRpcHandler.new(transport_handler)

  result = json_rpc_handler.call(jsonrpc_params)
  process_handler_results(result, session, session_initially_missing, is_initialize_request)
rescue ActionController::Live::ClientDisconnected, IOError => e
  if ActionMCP.configuration.verbose_logging
    Rails.logger.debug "Unified SSE (POST): Client disconnected during response: #{e.message}"
  end
  begin
    response.stream&.close
  rescue StandardError
    nil
  end
rescue StandardError => e
  Rails.logger.error "Unified POST Error: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
  id = begin
    jsonrpc_params.respond_to?(:id) ? jsonrpc_params.id : nil
  rescue StandardError
    nil
  end
  render_internal_server_error("An unexpected error occurred.", id) unless performed?
end

#destroyObject

Handles DELETE requests for session termination (2025-03-26 spec). <rails-lens:routes:begin> ROUTE: /, name: mcp_delete, via: DELETE <rails-lens:routes:end>



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'app/controllers/action_mcp/application_controller.rb', line 237

def destroy
  session_id_from_header = extract_session_id
  return render_bad_request("Mcp-Session-Id header is required for DELETE requests.") unless session_id_from_header

  session = Server.session_store.load_session(session_id_from_header)
  if session.nil?
    return render_not_found("Session not found.")
  elsif session.status == "closed"
    return head :no_content
  end

  # Authenticate the request via gateway
  authenticate_gateway!
  return if performed?

  begin
    session.close!
    Rails.logger.info "Unified DELETE: Terminated session: #{session.id}" if ActionMCP.configuration.verbose_logging
    head :no_content
  rescue StandardError => e
    Rails.logger.error "Unified DELETE: Error terminating session #{session.id}: #{e.class} - #{e.message}"
    render_internal_server_error("Failed to terminate session.")
  end
end

#mcp_sessionActionMCP::Session

Provides the ActionMCP::Session for the current request. Handles finding existing sessions via header/param or initializing a new one. Specific controllers/handlers might need to enforce session ID presence based on context.

Returns:



19
20
21
# File 'app/controllers/action_mcp/application_controller.rb', line 19

def mcp_session
  @mcp_session ||= find_or_initialize_session
end

#session_keyString

Provides a unique key for caching or pub/sub based on the session ID. Ensures mcp_session is called first to establish the session ID.

Returns:

  • (String)

    The session key string.



26
27
28
# File 'app/controllers/action_mcp/application_controller.rb', line 26

def session_key
  @session_key ||= "action_mcp-sessions-#{mcp_session.id}"
end

#showObject

Handles GET requests for establishing server-initiated SSE streams (2025-03-26 spec). <rails-lens:routes:begin> ROUTE: /, name: mcp_get, via: GET <rails-lens:routes:end>



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'app/controllers/action_mcp/application_controller.rb', line 34

def show
  unless request.accepts.any? { |type| type.to_s == "text/event-stream" }
    return render_not_acceptable("Client must accept 'text/event-stream' for GET requests.")
  end

  session_id_from_header = extract_session_id
  return render_bad_request("Mcp-Session-Id header is required for GET requests.") unless session_id_from_header

  session = mcp_session
  if session.nil? || session.new_record?
    return render_not_found("Session not found.")
  elsif !session.initialized?
    return render_bad_request("Session is not fully initialized.")
  elsif session.status == "closed"
    return render_not_found("Session has been terminated.")
  end

  # Authenticate the request via gateway
  authenticate_gateway!
  return if performed?

  last_event_id = request.headers["Last-Event-ID"].presence
  if last_event_id && ActionMCP.configuration.verbose_logging
    Rails.logger.info "Unified SSE (GET): Resuming from Last-Event-ID: #{last_event_id}"
  end

  response.headers["Content-Type"] = "text/event-stream"
  response.headers["X-Accel-Buffering"] = "no"
  response.headers["Cache-Control"] = "no-cache"
  response.headers["Connection"] = "keep-alive"
  # Add MCP-Protocol-Version header for established sessions
  response.headers["MCP-Protocol-Version"] = session.protocol_version

  if ActionMCP.configuration.verbose_logging
    Rails.logger.info "Unified SSE (GET): Starting stream for session: #{session.id}"
  end

  sse = SSE.new(response.stream)
  listener = SSEListener.new(session)
  connection_active = Concurrent::AtomicBoolean.new
  connection_active.make_true
  heartbeat_active = Concurrent::AtomicBoolean.new
  heartbeat_active.make_true
  heartbeat_task = nil

  listener_started = listener.start do |message|
    write_sse_event(sse, session, message)
  end

  unless listener_started
    Rails.logger.error "Unified SSE (GET): Listener failed to activate for session: #{session.id}"
    connection_active.make_false
    return
  end

  if last_event_id.present? && last_event_id.to_i.positive?
    begin
      missed_events = session.get_sse_events_after(last_event_id.to_i)
      if missed_events.any?
        if ActionMCP.configuration.verbose_logging
          Rails.logger.info "Unified SSE (GET): Sending #{missed_events.size} missed events for session: #{session.id}"
        end
        missed_events.each do |event|
          sse.write(event.to_sse)
        end
      elsif ActionMCP.configuration.verbose_logging
        if ActionMCP.configuration.verbose_logging
          Rails.logger.info "Unified SSE (GET): No missed events to send for session: #{session.id}"
        end
      end
    rescue StandardError => e
      Rails.logger.error "Unified SSE (GET): Error sending missed events: #{e.message}"
    end
  end

  heartbeat_interval = ActionMCP.configuration.sse_heartbeat_interval || 15.seconds
  heartbeat_sender = lambda do
    if connection_active.true? && !response.stream.closed?
      begin
        # Send a proper JSON-RPC notification for heartbeat
        ping_notification = {
          jsonrpc: "2.0",
          method: "notifications/ping",
          params: {}
        }
        future = Concurrent::Promises.future { write_sse_event(sse, session, ping_notification) }
        future.value!(5)
        if heartbeat_active.true?
          heartbeat_task = Concurrent::ScheduledTask.execute(heartbeat_interval, &heartbeat_sender)
        end
      rescue Concurrent::TimeoutError
        Rails.logger.warn "Unified SSE (GET): Heartbeat timed out for session: #{session.id}, closing."
        connection_active.make_false
      rescue StandardError => e
        if ActionMCP.configuration.verbose_logging
          Rails.logger.debug "Unified SSE (GET): Heartbeat error for session: #{session.id}: #{e.message}"
        end
        connection_active.make_false
      end
    else
      heartbeat_active.make_false
    end
  end

  heartbeat_task = Concurrent::ScheduledTask.execute(heartbeat_interval, &heartbeat_sender)
  sleep 0.1 while connection_active.true? && !response.stream.closed?
rescue ActionController::Live::ClientDisconnected, IOError => e
  if ActionMCP.configuration.verbose_logging
    Rails.logger.debug "Unified SSE (GET): Client disconnected for session: #{session&.id}: #{e.message}"
  end
rescue StandardError => e
  Rails.logger.error "Unified SSE (GET): Unexpected error for session: #{session&.id}: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}"
ensure
  if ActionMCP.configuration.verbose_logging
    Rails.logger.debug "Unified SSE (GET): Cleaning up connection for session: #{session&.id}"
  end
  heartbeat_active&.make_false
  heartbeat_task&.cancel
  listener&.stop
  cleanup_old_sse_events(session) if session
  sse&.close
  begin
    response.stream&.close
  rescue StandardError
    nil
  end
end