Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -136,6 +137,14 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final String latestSupportedProtocolVersion;

/**
* Stores the protocol version negotiated during the initialize handshake so that the
* GET SSE reconnect triggered by {@link #sendMessage} can use the correct version
* immediately, before the Reactor context is populated by
* {@code LifecycleInitializer}.
*/
private final AtomicReference<String> negotiatedProtocolVersion = new AtomicReference<>();

private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
Expand Down Expand Up @@ -277,7 +286,8 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
.header("Cache-Control", "no-cache")
.header(HttpHeaders.PROTOCOL_VERSION,
connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
this.latestSupportedProtocolVersion))
Optional.ofNullable(this.negotiatedProtocolVersion.get())
.orElse(this.latestSupportedProtocolVersion)))
.GET();
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
Expand Down Expand Up @@ -450,6 +460,39 @@ else if (contentType.contains(APPLICATION_JSON)) {

}

/**
* Attempts to parse a {@code protocolVersion} from the initialize response body. This
* is needed because the Reactor context is not yet populated by
* {@code LifecycleInitializer} at the time the first GET reconnect is triggered.
*/
@SuppressWarnings("unchecked")
private Optional<String> extractProtocolVersion(ResponseSubscribers.ResponseEvent responseEvent) {
String data = null;
if (responseEvent instanceof ResponseSubscribers.AggregateResponseEvent agg) {
data = agg.data();
}
else if (responseEvent instanceof ResponseSubscribers.SseResponseEvent sse) {
data = sse.sseEvent().data();
}
if (data == null || data.isBlank()) {
return Optional.empty();
}
try {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, data);
if (message instanceof McpSchema.JSONRPCResponse response
&& response.result() instanceof Map<?, ?> result) {
Object version = result.get("protocolVersion");
if (version instanceof String v && !v.isBlank()) {
return Optional.of(v);
}
}
}
catch (Exception ignored) {
// Best-effort; the context-based fallback in reconnect() still applies.
}
return Optional.empty();
}

public String toString(McpSchema.JSONRPCMessage message) {
try {
return this.jsonMapper.writeValueAsString(message);
Expand Down Expand Up @@ -514,7 +557,11 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.

// Extract the negotiated protocol version from the initialize
// response body before triggering the GET reconnect, since the
// Reactor context is not yet populated by LifecycleInitializer
// at this point in the reactive chain.
extractProtocolVersion(responseEvent).ifPresent(this.negotiatedProtocolVersion::set);
reconnect(null).contextWrite(deliveredSink.contextView()).subscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
}

try {
request.setCharacterEncoding(UTF_8);
BufferedReader reader = request.getReader();
StringBuilder body = new StringBuilder();
String line;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
}

try {
request.setCharacterEncoding(UTF_8);
BufferedReader reader = request.getReader();
StringBuilder body = new StringBuilder();
String line;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
McpTransportContext transportContext = this.contextExtractor.extract(request);

try {
request.setCharacterEncoding(UTF_8);
BufferedReader reader = request.getReader();
StringBuilder body = new StringBuilder();
String line;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ void usesServerSupportedVersion() {
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));

var calls = requestRecordingFilter.getCalls();
// Initialize tells the server the Client's latest supported version
// FIXME: Set the correct protocol version on GET /mcp
assertThat(calls).filteredOn(c -> c.method().equals("POST") && !c.body().contains("\"method\":\"initialize\""))
// POST notification/initialized ; POST tools/call
.hasSize(2)
assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
// GET /mcp ; POST notification/initialized ; POST tools/call
.hasSize(3)
.map(McpTestRequestRecordingServletFilter.Call::headers)
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version",
ProtocolVersions.MCP_2025_11_25));
Expand Down