diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 142c0302c..82b8c1255 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicReference; @@ -136,6 +137,14 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final String latestSupportedProtocolVersion; + /** + * Stores the protocol version negotiated during the initialize handshake so that the + * GET SSE reconnect triggered by {@link #sendMessage} can use the correct version + * immediately, before the Reactor context is populated by + * {@code LifecycleInitializer}. + */ + private final AtomicReference negotiatedProtocolVersion = new AtomicReference<>(); + private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, @@ -277,7 +286,8 @@ private Mono reconnect(McpTransportStream stream) { .header("Cache-Control", "no-cache") .header(HttpHeaders.PROTOCOL_VERSION, connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, - this.latestSupportedProtocolVersion)) + Optional.ofNullable(this.negotiatedProtocolVersion.get()) + .orElse(this.latestSupportedProtocolVersion))) .GET(); var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); @@ -450,6 +460,39 @@ else if (contentType.contains(APPLICATION_JSON)) { } + /** + * Attempts to parse a {@code protocolVersion} from the initialize response body. This + * is needed because the Reactor context is not yet populated by + * {@code LifecycleInitializer} at the time the first GET reconnect is triggered. + */ + @SuppressWarnings("unchecked") + private Optional extractProtocolVersion(ResponseSubscribers.ResponseEvent responseEvent) { + String data = null; + if (responseEvent instanceof ResponseSubscribers.AggregateResponseEvent agg) { + data = agg.data(); + } + else if (responseEvent instanceof ResponseSubscribers.SseResponseEvent sse) { + data = sse.sseEvent().data(); + } + if (data == null || data.isBlank()) { + return Optional.empty(); + } + try { + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, data); + if (message instanceof McpSchema.JSONRPCResponse response + && response.result() instanceof Map result) { + Object version = result.get("protocolVersion"); + if (version instanceof String v && !v.isBlank()) { + return Optional.of(v); + } + } + } + catch (Exception ignored) { + // Best-effort; the context-based fallback in reconnect() still applies. + } + return Optional.empty(); + } + public String toString(McpSchema.JSONRPCMessage message) { try { return this.jsonMapper.writeValueAsString(message); @@ -514,7 +557,11 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) { // Once we have a session, we try to open an async stream for // the server to send notifications and requests out-of-band. - + // Extract the negotiated protocol version from the initialize + // response body before triggering the GET reconnect, since the + // Reactor context is not yet populated by LifecycleInitializer + // at this point in the reactive chain. + extractProtocolVersion(responseEvent).ifPresent(this.negotiatedProtocolVersion::set); reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java index 0fb2fa778..9d2609c90 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java @@ -386,6 +386,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) } try { + request.setCharacterEncoding(UTF_8); BufferedReader reader = request.getReader(); StringBuilder body = new StringBuilder(); String line; diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java index 047aeebe8..aa0d62351 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java @@ -154,6 +154,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) } try { + request.setCharacterEncoding(UTF_8); BufferedReader reader = request.getReader(); StringBuilder body = new StringBuilder(); String line; diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java index fe38b2589..7e28c2b08 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java @@ -428,6 +428,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) McpTransportContext transportContext = this.contextExtractor.extract(request); try { + request.setCharacterEncoding(UTF_8); BufferedReader reader = request.getReader(); StringBuilder body = new StringBuilder(); String line; diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java index 29eef1410..d8b3b0696 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java @@ -103,11 +103,9 @@ void usesServerSupportedVersion() { McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of())); var calls = requestRecordingFilter.getCalls(); - // Initialize tells the server the Client's latest supported version - // FIXME: Set the correct protocol version on GET /mcp - assertThat(calls).filteredOn(c -> c.method().equals("POST") && !c.body().contains("\"method\":\"initialize\"")) - // POST notification/initialized ; POST tools/call - .hasSize(2) + assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\"")) + // GET /mcp ; POST notification/initialized ; POST tools/call + .hasSize(3) .map(McpTestRequestRecordingServletFilter.Call::headers) .allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", ProtocolVersions.MCP_2025_11_25));