mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-19 01:40:15 +01:00
🔒 feat: Idempotency Check for OAuth Flow Completion (#10468)
* 🔒 feat: Implement idempotency check for OAuth flow completion
- Added a check to prevent duplicate token exchanges if the OAuth flow has already been completed.
- Updated the OAuth callback route to redirect appropriately when a completed flow is detected.
- Refactored token storage logic to use original flow state credentials instead of updated ones.
- Enhanced tests to cover the new idempotency behavior and ensure correct handling of OAuth flow states.
* chore: add back scope for logging
* refactor: Add isFlowStale method to FlowStateManager for stale flow detection
- Implemented a new method to check if a flow is stale based on its age and status.
- Updated MCPConnectionFactory to utilize the isFlowStale method for cleaning up stale OAuth flows.
- Enhanced logging to provide more informative messages regarding flow status and age during cleanup.
* test: Add unit tests for isFlowStale method in FlowStateManager
- Implemented comprehensive tests for the isFlowStale method to verify its behavior across various flow statuses (PENDING, COMPLETED, FAILED) and age thresholds.
- Ensured correct handling of edge cases, including flows with missing timestamps and custom stale thresholds.
- Enhanced test coverage to validate the logic for determining flow staleness based on createdAt, completedAt, and failedAt timestamps.
This commit is contained in:
parent
a49c509ebc
commit
dd35f42073
6 changed files with 380 additions and 72 deletions
|
|
@ -1,6 +1,6 @@
|
|||
import { Keyv } from 'keyv';
|
||||
import { FlowStateManager } from './manager';
|
||||
import type { FlowState } from './types';
|
||||
import { FlowState } from './types';
|
||||
|
||||
/** Mock class without extending Keyv */
|
||||
class MockKeyv {
|
||||
|
|
@ -181,4 +181,214 @@ describe('FlowStateManager', () => {
|
|||
expect(result).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isFlowStale', () => {
|
||||
const flowId = 'test-flow-stale';
|
||||
const type = 'test-type';
|
||||
const flowKey = `${type}:${flowId}`;
|
||||
|
||||
it('returns not stale for non-existent flow', async () => {
|
||||
const result = await flowManager.isFlowStale(flowId, type);
|
||||
|
||||
expect(result).toEqual({
|
||||
isStale: false,
|
||||
age: 0,
|
||||
});
|
||||
});
|
||||
|
||||
it('returns not stale for PENDING flow regardless of age', async () => {
|
||||
const oldTimestamp = Date.now() - 10 * 60 * 1000; // 10 minutes ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'PENDING',
|
||||
metadata: {},
|
||||
createdAt: oldTimestamp,
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result).toEqual({
|
||||
isStale: false,
|
||||
age: 0,
|
||||
status: 'PENDING',
|
||||
});
|
||||
});
|
||||
|
||||
it('returns not stale for recently COMPLETED flow', async () => {
|
||||
const recentTimestamp = Date.now() - 30 * 1000; // 30 seconds ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 60 * 1000,
|
||||
completedAt: recentTimestamp,
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(false);
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
expect(result.age).toBeGreaterThan(0);
|
||||
expect(result.age).toBeLessThan(60 * 1000);
|
||||
});
|
||||
|
||||
it('returns stale for old COMPLETED flow', async () => {
|
||||
const oldTimestamp = Date.now() - 5 * 60 * 1000; // 5 minutes ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 10 * 60 * 1000,
|
||||
completedAt: oldTimestamp,
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(true);
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
expect(result.age).toBeGreaterThan(2 * 60 * 1000);
|
||||
});
|
||||
|
||||
it('returns not stale for recently FAILED flow', async () => {
|
||||
const recentTimestamp = Date.now() - 30 * 1000; // 30 seconds ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'FAILED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 60 * 1000,
|
||||
failedAt: recentTimestamp,
|
||||
error: 'Test error',
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(false);
|
||||
expect(result.status).toBe('FAILED');
|
||||
expect(result.age).toBeGreaterThan(0);
|
||||
expect(result.age).toBeLessThan(60 * 1000);
|
||||
});
|
||||
|
||||
it('returns stale for old FAILED flow', async () => {
|
||||
const oldTimestamp = Date.now() - 5 * 60 * 1000; // 5 minutes ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'FAILED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 10 * 60 * 1000,
|
||||
failedAt: oldTimestamp,
|
||||
error: 'Test error',
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(true);
|
||||
expect(result.status).toBe('FAILED');
|
||||
expect(result.age).toBeGreaterThan(2 * 60 * 1000);
|
||||
});
|
||||
|
||||
it('uses custom stale threshold', async () => {
|
||||
const timestamp = Date.now() - 90 * 1000; // 90 seconds ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 2 * 60 * 1000,
|
||||
completedAt: timestamp,
|
||||
});
|
||||
|
||||
// 90 seconds old, threshold 60 seconds = stale
|
||||
const result1 = await flowManager.isFlowStale(flowId, type, 60 * 1000);
|
||||
expect(result1.isStale).toBe(true);
|
||||
|
||||
// 90 seconds old, threshold 120 seconds = not stale
|
||||
const result2 = await flowManager.isFlowStale(flowId, type, 120 * 1000);
|
||||
expect(result2.isStale).toBe(false);
|
||||
});
|
||||
|
||||
it('uses default threshold of 2 minutes when not specified', async () => {
|
||||
const timestamp = Date.now() - 3 * 60 * 1000; // 3 minutes ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: Date.now() - 5 * 60 * 1000,
|
||||
completedAt: timestamp,
|
||||
});
|
||||
|
||||
// Should use default 2 minute threshold
|
||||
const result = await flowManager.isFlowStale(flowId, type);
|
||||
|
||||
expect(result.isStale).toBe(true);
|
||||
expect(result.age).toBeGreaterThan(2 * 60 * 1000);
|
||||
});
|
||||
|
||||
it('falls back to createdAt when completedAt/failedAt are not present', async () => {
|
||||
const createdTimestamp = Date.now() - 5 * 60 * 1000; // 5 minutes ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: createdTimestamp,
|
||||
// No completedAt or failedAt
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(true);
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
expect(result.age).toBeGreaterThan(2 * 60 * 1000);
|
||||
});
|
||||
|
||||
it('handles flow with no timestamps', async () => {
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
// No timestamps at all
|
||||
} as FlowState<string>);
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
expect(result.isStale).toBe(false);
|
||||
expect(result.age).toBe(0);
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
});
|
||||
|
||||
it('prefers completedAt over createdAt for age calculation', async () => {
|
||||
const createdTimestamp = Date.now() - 10 * 60 * 1000; // 10 minutes ago
|
||||
const completedTimestamp = Date.now() - 30 * 1000; // 30 seconds ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'COMPLETED',
|
||||
metadata: {},
|
||||
createdAt: createdTimestamp,
|
||||
completedAt: completedTimestamp,
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
// Should use completedAt (30s) not createdAt (10m)
|
||||
expect(result.isStale).toBe(false);
|
||||
expect(result.age).toBeLessThan(60 * 1000);
|
||||
});
|
||||
|
||||
it('prefers failedAt over createdAt for age calculation', async () => {
|
||||
const createdTimestamp = Date.now() - 10 * 60 * 1000; // 10 minutes ago
|
||||
const failedTimestamp = Date.now() - 30 * 1000; // 30 seconds ago
|
||||
await store.set(flowKey, {
|
||||
type,
|
||||
status: 'FAILED',
|
||||
metadata: {},
|
||||
createdAt: createdTimestamp,
|
||||
failedAt: failedTimestamp,
|
||||
error: 'Test error',
|
||||
});
|
||||
|
||||
const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000);
|
||||
|
||||
// Should use failedAt (30s) not createdAt (10m)
|
||||
expect(result.isStale).toBe(false);
|
||||
expect(result.age).toBeLessThan(60 * 1000);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -151,9 +151,25 @@ export class FlowStateManager<T = unknown> {
|
|||
const flowState = (await this.keyv.get(flowKey)) as FlowState<T> | undefined;
|
||||
|
||||
if (!flowState) {
|
||||
logger.warn('[FlowStateManager] Cannot complete flow - flow state not found', {
|
||||
flowId,
|
||||
type,
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Prevent duplicate completion */
|
||||
if (flowState.status === 'COMPLETED') {
|
||||
logger.debug(
|
||||
'[FlowStateManager] Flow already completed, skipping to prevent duplicate completion',
|
||||
{
|
||||
flowId,
|
||||
type,
|
||||
},
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
const updatedState: FlowState<T> = {
|
||||
...flowState,
|
||||
status: 'COMPLETED',
|
||||
|
|
@ -162,9 +178,55 @@ export class FlowStateManager<T = unknown> {
|
|||
};
|
||||
|
||||
await this.keyv.set(flowKey, updatedState, this.ttl);
|
||||
|
||||
logger.debug('[FlowStateManager] Flow completed successfully', {
|
||||
flowId,
|
||||
type,
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a flow is stale based on its age and status
|
||||
* @param flowId - The flow identifier
|
||||
* @param type - The flow type
|
||||
* @param staleThresholdMs - Age in milliseconds after which a non-pending flow is considered stale (default: 2 minutes)
|
||||
* @returns Object with isStale boolean and age in milliseconds
|
||||
*/
|
||||
async isFlowStale(
|
||||
flowId: string,
|
||||
type: string,
|
||||
staleThresholdMs: number = 2 * 60 * 1000,
|
||||
): Promise<{ isStale: boolean; age: number; status?: string }> {
|
||||
const flowKey = this.getFlowKey(flowId, type);
|
||||
const flowState = (await this.keyv.get(flowKey)) as FlowState<T> | undefined;
|
||||
|
||||
if (!flowState) {
|
||||
return { isStale: false, age: 0 };
|
||||
}
|
||||
|
||||
if (flowState.status === 'PENDING') {
|
||||
return { isStale: false, age: 0, status: flowState.status };
|
||||
}
|
||||
|
||||
const completedAt = flowState.completedAt || flowState.failedAt;
|
||||
const createdAt = flowState.createdAt;
|
||||
|
||||
let flowAge = 0;
|
||||
if (completedAt) {
|
||||
flowAge = Date.now() - completedAt;
|
||||
} else if (createdAt) {
|
||||
flowAge = Date.now() - createdAt;
|
||||
}
|
||||
|
||||
return {
|
||||
isStale: flowAge > staleThresholdMs,
|
||||
age: flowAge,
|
||||
status: flowState.status,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a flow as failed
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ import type { FlowMetadata } from '~/flow/types';
|
|||
import type * as t from './types';
|
||||
import { MCPTokenStorage, MCPOAuthHandler } from '~/mcp/oauth';
|
||||
import { sanitizeUrlForLogging } from './utils';
|
||||
import { withTimeout } from '~/utils/promise';
|
||||
import { MCPConnection } from './connection';
|
||||
import { processMCPEnv } from '~/utils';
|
||||
import { withTimeout } from '~/utils/promise';
|
||||
|
||||
/**
|
||||
* Factory for creating MCP connections with optional OAuth authentication.
|
||||
|
|
@ -343,28 +343,42 @@ export class MCPConnectionFactory {
|
|||
`${this.logPrefix} OAuth flow completed, tokens received for ${this.serverName}`,
|
||||
);
|
||||
|
||||
// Re-fetch flow state after completion to get updated credentials
|
||||
const updatedFlowState = await MCPOAuthHandler.getFlowState(
|
||||
flowId,
|
||||
this.flowManager as FlowStateManager<MCPOAuthTokens>,
|
||||
);
|
||||
|
||||
/** Client information from the updated flow metadata */
|
||||
/** Client information from the existing flow metadata */
|
||||
const existingMetadata = existingFlow.metadata as unknown as MCPOAuthFlowMetadata;
|
||||
const clientInfo = updatedFlowState?.clientInfo || existingMetadata?.clientInfo;
|
||||
const clientInfo = existingMetadata?.clientInfo;
|
||||
|
||||
return { tokens, clientInfo };
|
||||
}
|
||||
|
||||
// Clean up old completed flows: createFlow() may return cached results otherwise
|
||||
// Clean up old completed/failed flows, but only if they're actually stale
|
||||
// This prevents race conditions where we delete a flow that's still being processed
|
||||
if (existingFlow && existingFlow.status !== 'PENDING') {
|
||||
try {
|
||||
await this.flowManager.deleteFlow(flowId, 'mcp_oauth');
|
||||
const STALE_FLOW_THRESHOLD = 2 * 60 * 1000; // 2 minutes
|
||||
const { isStale, age, status } = await this.flowManager.isFlowStale(
|
||||
flowId,
|
||||
'mcp_oauth',
|
||||
STALE_FLOW_THRESHOLD,
|
||||
);
|
||||
|
||||
if (isStale) {
|
||||
try {
|
||||
await this.flowManager.deleteFlow(flowId, 'mcp_oauth');
|
||||
logger.debug(
|
||||
`${this.logPrefix} Cleared stale ${status} OAuth flow (age: ${Math.round(age / 1000)}s)`,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.warn(`${this.logPrefix} Failed to clear stale OAuth flow`, error);
|
||||
}
|
||||
} else {
|
||||
logger.debug(
|
||||
`${this.logPrefix} Cleared stale ${existingFlow.status} OAuth flow for ${flowId}`,
|
||||
`${this.logPrefix} Skipping cleanup of recent ${status} flow (age: ${Math.round(age / 1000)}s, threshold: ${STALE_FLOW_THRESHOLD / 1000}s)`,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.warn(`${this.logPrefix} Failed to clear stale OAuth flow`, error);
|
||||
// If flow is recent but not pending, something might be wrong
|
||||
if (status === 'FAILED') {
|
||||
logger.warn(
|
||||
`${this.logPrefix} Recent OAuth flow failed, will retry after ${Math.round((STALE_FLOW_THRESHOLD - age) / 1000)}s`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -402,15 +416,9 @@ export class MCPConnectionFactory {
|
|||
}
|
||||
logger.info(`${this.logPrefix} OAuth flow completed, tokens received for ${this.serverName}`);
|
||||
|
||||
// Re-fetch flow state after completion to get updated credentials
|
||||
const updatedFlowState = await MCPOAuthHandler.getFlowState(
|
||||
newFlowId,
|
||||
this.flowManager as FlowStateManager<MCPOAuthTokens>,
|
||||
);
|
||||
|
||||
/** Client information from the updated flow state */
|
||||
const clientInfo = updatedFlowState?.clientInfo || flowMetadata?.clientInfo;
|
||||
const metadata = updatedFlowState?.metadata || flowMetadata?.metadata;
|
||||
/** Client information from the flow metadata */
|
||||
const clientInfo = flowMetadata?.clientInfo;
|
||||
const metadata = flowMetadata?.metadata;
|
||||
|
||||
return {
|
||||
tokens,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
import { randomBytes } from 'crypto';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import { FetchLike } from '@modelcontextprotocol/sdk/shared/transport';
|
||||
import { OAuthMetadataSchema } from '@modelcontextprotocol/sdk/shared/auth.js';
|
||||
import {
|
||||
registerClient,
|
||||
startAuthorization,
|
||||
|
|
@ -7,7 +9,6 @@ import {
|
|||
discoverAuthorizationServerMetadata,
|
||||
discoverOAuthProtectedResourceMetadata,
|
||||
} from '@modelcontextprotocol/sdk/client/auth.js';
|
||||
import { OAuthMetadataSchema } from '@modelcontextprotocol/sdk/shared/auth.js';
|
||||
import type { MCPOptions } from 'librechat-data-provider';
|
||||
import type { FlowStateManager } from '~/flow/manager';
|
||||
import type {
|
||||
|
|
@ -18,7 +19,6 @@ import type {
|
|||
OAuthMetadata,
|
||||
} from './types';
|
||||
import { sanitizeUrlForLogging } from '~/mcp/utils';
|
||||
import { FetchLike } from '@modelcontextprotocol/sdk/shared/transport';
|
||||
|
||||
/** Type for the OAuth metadata from the SDK */
|
||||
type SDKOAuthMetadata = Parameters<typeof registerClient>[1]['metadata'];
|
||||
|
|
@ -439,9 +439,10 @@ export class MCPOAuthHandler {
|
|||
fetchFn: this.createOAuthFetch(oauthHeaders),
|
||||
});
|
||||
|
||||
logger.debug('[MCPOAuth] Raw tokens from exchange:', {
|
||||
access_token: tokens.access_token ? '[REDACTED]' : undefined,
|
||||
refresh_token: tokens.refresh_token ? '[REDACTED]' : undefined,
|
||||
logger.debug('[MCPOAuth] Token exchange successful', {
|
||||
flowId,
|
||||
has_access_token: !!tokens.access_token,
|
||||
has_refresh_token: !!tokens.refresh_token,
|
||||
expires_in: tokens.expires_in,
|
||||
token_type: tokens.token_type,
|
||||
scope: tokens.scope,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue