📬 feat: Implement Delta Buffering System for Out-of-Order SSE Events (#11643)

*  test: Add MCP tool definitions tests for server name variants

- Introduced new test cases for loading MCP tools with underscored and hyphenated server names, ensuring correct functionality and handling of tool definitions.
- Validated that the tool definitions are loaded accurately based on different server name formats, enhancing test coverage for the MCP tool integration.
- Included assertions to verify the expected behavior and properties of the loaded tools, improving reliability and maintainability of the tests.

* refactor: useStepHandler to support additional delta events and buffer management

- Added support for Agents.ReasoningDeltaEvent and Agents.RunStepDeltaEvent in the TStepEvent type.
- Introduced a pendingDeltaBuffer to store deltas that arrive before their corresponding run step, ensuring they are processed in the correct order.
- Updated event handling to buffer deltas when no corresponding run step is found, improving the reliability of message processing.
- Cleared the pendingDeltaBuffer during cleanup to prevent memory leaks.
This commit is contained in:
Danny Avila 2026-02-05 14:00:54 +01:00 committed by GitHub
parent c8e4257342
commit 1ba5bf87b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 1297 additions and 3 deletions

File diff suppressed because it is too large Load diff

View file

@ -31,6 +31,8 @@ type TStepEvent = {
event: string;
data:
| Agents.MessageDeltaEvent
| Agents.ReasoningDeltaEvent
| Agents.RunStepDeltaEvent
| Agents.AgentUpdate
| Agents.RunStep
| Agents.ToolEndEvent
@ -61,6 +63,8 @@ export default function useStepHandler({
const toolCallIdMap = useRef(new Map<string, string | undefined>());
const messageMap = useRef(new Map<string, TMessage>());
const stepMap = useRef(new Map<string, Agents.RunStep>());
/** Buffer for deltas that arrive before their corresponding run step */
const pendingDeltaBuffer = useRef(new Map<string, TStepEvent[]>());
/**
* Calculate content index for a run step.
@ -350,6 +354,14 @@ export default function useStepHandler({
setMessages(updatedMessages);
}
const bufferedDeltas = pendingDeltaBuffer.current.get(runStep.id);
if (bufferedDeltas && bufferedDeltas.length > 0) {
pendingDeltaBuffer.current.delete(runStep.id);
for (const bufferedDelta of bufferedDeltas) {
stepHandler({ event: bufferedDelta.event, data: bufferedDelta.data }, submission);
}
}
} else if (event === 'on_agent_update') {
const { agent_update } = data as Agents.AgentUpdate;
let responseMessageId = agent_update.runId || '';
@ -391,7 +403,9 @@ export default function useStepHandler({
}
if (!runStep || !responseMessageId) {
console.warn('No run step or runId found for message delta event');
const buffer = pendingDeltaBuffer.current.get(messageDelta.id) ?? [];
buffer.push({ event: 'on_message_delta', data: messageDelta });
pendingDeltaBuffer.current.set(messageDelta.id, buffer);
return;
}
@ -432,7 +446,9 @@ export default function useStepHandler({
}
if (!runStep || !responseMessageId) {
console.warn('No run step or runId found for reasoning delta event');
const buffer = pendingDeltaBuffer.current.get(reasoningDelta.id) ?? [];
buffer.push({ event: 'on_reasoning_delta', data: reasoningDelta });
pendingDeltaBuffer.current.set(reasoningDelta.id, buffer);
return;
}
@ -473,7 +489,9 @@ export default function useStepHandler({
}
if (!runStep || !responseMessageId) {
console.warn('No run step or runId found for run step delta event');
const buffer = pendingDeltaBuffer.current.get(runStepDelta.id) ?? [];
buffer.push({ event: 'on_run_step_delta', data: runStepDelta });
pendingDeltaBuffer.current.set(runStepDelta.id, buffer);
return;
}
@ -578,6 +596,7 @@ export default function useStepHandler({
toolCallIdMap.current.clear();
messageMap.current.clear();
stepMap.current.clear();
pendingDeltaBuffer.current.clear();
}, []);
/**

View file

@ -284,6 +284,196 @@ describe('definitions.ts', () => {
});
});
describe('MCP tool definitions with server name variants', () => {
it('should load MCP tools with underscored server names (server_one)', async () => {
const mockServerTools = {
list_items_mcp_server_one: {
function: {
name: 'list_items_mcp_server_one',
description: 'List all items from server',
parameters: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Max items to return' },
},
},
},
},
get_item_mcp_server_one: {
function: {
name: 'get_item_mcp_server_one',
description: 'Get a specific item',
parameters: {
type: 'object',
properties: {
id: { type: 'string', description: 'Item ID' },
},
required: ['id'],
},
},
},
};
mockGetOrFetchMCPServerTools.mockResolvedValue(mockServerTools);
const params: LoadToolDefinitionsParams = {
userId: 'user-123',
agentId: 'agent-123',
tools: ['sys__all__sys_mcp_server_one'],
};
const deps: LoadToolDefinitionsDeps = {
getOrFetchMCPServerTools: mockGetOrFetchMCPServerTools,
isBuiltInTool: mockIsBuiltInTool,
loadAuthValues: mockLoadAuthValues,
};
const result = await loadToolDefinitions(params, deps);
expect(mockGetOrFetchMCPServerTools).toHaveBeenCalledWith('user-123', 'server_one');
expect(result.toolDefinitions).toHaveLength(2);
const listItemsDef = result.toolDefinitions.find(
(d) => d.name === 'list_items_mcp_server_one',
);
expect(listItemsDef).toBeDefined();
expect(listItemsDef?.description).toBe('List all items from server');
const getItemDef = result.toolDefinitions.find((d) => d.name === 'get_item_mcp_server_one');
expect(getItemDef).toBeDefined();
expect(getItemDef?.description).toBe('Get a specific item');
});
it('should load MCP tools with hyphenated server names (server-one)', async () => {
const mockServerTools = {
'list_items_mcp_server-one': {
function: {
name: 'list_items_mcp_server-one',
description: 'List all items from server',
parameters: {
type: 'object',
properties: {
limit: { type: 'number', description: 'Max items to return' },
},
},
},
},
'get_item_mcp_server-one': {
function: {
name: 'get_item_mcp_server-one',
description: 'Get a specific item',
parameters: {
type: 'object',
properties: {
id: { type: 'string', description: 'Item ID' },
},
required: ['id'],
},
},
},
};
mockGetOrFetchMCPServerTools.mockResolvedValue(mockServerTools);
const params: LoadToolDefinitionsParams = {
userId: 'user-123',
agentId: 'agent-123',
tools: ['sys__all__sys_mcp_server-one'],
};
const deps: LoadToolDefinitionsDeps = {
getOrFetchMCPServerTools: mockGetOrFetchMCPServerTools,
isBuiltInTool: mockIsBuiltInTool,
loadAuthValues: mockLoadAuthValues,
};
const result = await loadToolDefinitions(params, deps);
expect(mockGetOrFetchMCPServerTools).toHaveBeenCalledWith('user-123', 'server-one');
expect(result.toolDefinitions).toHaveLength(2);
const listItemsDef = result.toolDefinitions.find(
(d) => d.name === 'list_items_mcp_server-one',
);
expect(listItemsDef).toBeDefined();
expect(listItemsDef?.description).toBe('List all items from server');
const getItemDef = result.toolDefinitions.find((d) => d.name === 'get_item_mcp_server-one');
expect(getItemDef).toBeDefined();
expect(getItemDef?.description).toBe('Get a specific item');
});
it('should handle individual MCP tool lookup with hyphenated server name', async () => {
const mockServerTools = {
'list_items_mcp_server-one': {
function: {
name: 'list_items_mcp_server-one',
description: 'List all items from server',
parameters: {
type: 'object',
properties: {},
},
},
},
};
mockGetOrFetchMCPServerTools.mockResolvedValue(mockServerTools);
const params: LoadToolDefinitionsParams = {
userId: 'user-123',
agentId: 'agent-123',
tools: ['list_items_mcp_server-one'],
};
const deps: LoadToolDefinitionsDeps = {
getOrFetchMCPServerTools: mockGetOrFetchMCPServerTools,
isBuiltInTool: mockIsBuiltInTool,
loadAuthValues: mockLoadAuthValues,
};
const result = await loadToolDefinitions(params, deps);
expect(mockGetOrFetchMCPServerTools).toHaveBeenCalledWith('user-123', 'server-one');
expect(result.toolDefinitions).toHaveLength(1);
expect(result.toolDefinitions[0].name).toBe('list_items_mcp_server-one');
});
it('should include hyphenated server name tools in registry with correct serverName', async () => {
const mockServerTools = {
'list_items_mcp_my-server': {
function: {
name: 'list_items_mcp_my-server',
description: 'List items',
parameters: { type: 'object', properties: {} },
},
},
};
mockGetOrFetchMCPServerTools.mockResolvedValue(mockServerTools);
const params: LoadToolDefinitionsParams = {
userId: 'user-123',
agentId: 'agent-123',
tools: ['sys__all__sys_mcp_my-server'],
};
const deps: LoadToolDefinitionsDeps = {
getOrFetchMCPServerTools: mockGetOrFetchMCPServerTools,
isBuiltInTool: mockIsBuiltInTool,
loadAuthValues: mockLoadAuthValues,
};
const result = await loadToolDefinitions(params, deps);
expect(result.toolDefinitions).toHaveLength(1);
expect(result.toolRegistry.size).toBeGreaterThan(0);
const toolDef = result.toolDefinitions[0];
expect(toolDef.name).toBe('list_items_mcp_my-server');
expect((toolDef as { serverName?: string }).serverName).toBe('my-server');
});
});
describe('tool registry metadata', () => {
it('should include description and parameters in registry for action tools', async () => {
const mockActionDefs: ActionToolDefinition[] = [