🔄 fix: Improve MCP Connection Cleanup (#7400)

* chore: linting for mcp related modules

* fix: update `isConnected` method to return a Promise and handle connection state asynchronously to properly handle/cleanup disconnected user connections
This commit is contained in:
Danny Avila 2025-05-15 12:17:17 -04:00 committed by GitHub
parent 535e7798b3
commit fe311df969
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 29 additions and 12 deletions

View file

@ -85,7 +85,10 @@ export const SSEOptionsSchema = BaseOptionsSchema.extend({
export const StreamableHTTPOptionsSchema = BaseOptionsSchema.extend({
type: z.literal('streamable-http'),
headers: z.record(z.string(), z.string()).optional(),
url: z.string().url().refine(
url: z
.string()
.url()
.refine(
(val) => {
const protocol = new URL(val).protocol;
return protocol !== 'ws:' && protocol !== 'wss:';
@ -93,7 +96,7 @@ export const StreamableHTTPOptionsSchema = BaseOptionsSchema.extend({
{
message: 'Streamable HTTP URL must not start with ws:// or wss://',
},
),
),
});
export const MCPOptionsSchema = z.union([

View file

@ -567,8 +567,14 @@ export class MCPConnection extends EventEmitter {
return this.connectionState;
}
public isConnected(): boolean {
return this.connectionState === 'connected';
public async isConnected(): Promise<boolean> {
try {
await this.client.ping();
return this.connectionState === 'connected';
} catch (error) {
this.logger?.error(`${this.getLogPrefix()} Ping failed:`, error);
return false;
}
}
public getLastError(): Error | null {

View file

@ -71,7 +71,7 @@ export class MCPManager {
const connectionAttempt = this.initializeServer(connection, `[MCP][${serverName}]`);
await Promise.race([connectionAttempt, connectionTimeout]);
if (connection.isConnected()) {
if (await connection.isConnected()) {
initializedServers.add(i);
this.connections.set(serverName, connection); // Store in app-level map
@ -135,7 +135,7 @@ export class MCPManager {
while (attempts < maxAttempts) {
try {
await connection.connect();
if (connection.isConnected()) {
if (await connection.isConnected()) {
return;
}
throw new Error('Connection attempt succeeded but status is not connected');
@ -200,7 +200,7 @@ export class MCPManager {
}
connection = undefined; // Force creation of a new connection
} else if (connection) {
if (connection.isConnected()) {
if (await connection.isConnected()) {
this.logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
// Update timestamp on reuse
this.updateUserLastActivity(userId);
@ -244,7 +244,7 @@ export class MCPManager {
);
await Promise.race([connectionAttempt, connectionTimeout]);
if (!connection.isConnected()) {
if (!(await connection.isConnected())) {
throw new Error('Failed to establish connection after initialization attempt.');
}
@ -342,7 +342,7 @@ export class MCPManager {
public async mapAvailableTools(availableTools: t.LCAvailableTools): Promise<void> {
for (const [serverName, connection] of this.connections.entries()) {
try {
if (connection.isConnected() !== true) {
if ((await connection.isConnected()) !== true) {
this.logger.warn(
`[MCP][${serverName}] Connection not established. Skipping tool mapping.`,
);
@ -375,7 +375,7 @@ export class MCPManager {
for (const [serverName, connection] of this.connections.entries()) {
try {
if (connection.isConnected() !== true) {
if ((await connection.isConnected()) !== true) {
this.logger.warn(
`[MCP][${serverName}] Connection not established. Skipping manifest loading.`,
);
@ -443,7 +443,7 @@ export class MCPManager {
}
}
if (!connection.isConnected()) {
if (!(await connection.isConnected())) {
// This might happen if getUserConnection failed silently or app connection dropped
throw new McpError(
ErrorCode.InternalError, // Use InternalError for connection issues

View file

@ -1,5 +1,13 @@
import type * as t from './types/mcp';
const RECOGNIZED_PROVIDERS = new Set(['google', 'anthropic', 'openai', 'openrouter', 'xai', 'deepseek', 'ollama']);
const RECOGNIZED_PROVIDERS = new Set([
'google',
'anthropic',
'openai',
'openrouter',
'xai',
'deepseek',
'ollama',
]);
const CONTENT_ARRAY_PROVIDERS = new Set(['google', 'anthropic', 'openai']);
const imageFormatters: Record<string, undefined | t.ImageFormatter> = {