WIP: fix: fix abort messages and continue conversation on abort

feat(askOpenAI.js): add abort endpoint to cancel requests
feat(MessageHandler): add abort functionality to cancel requests
feat(submission.js): add lastResponse and source atoms to store
feat(handleSubmit.js): add stopGenerating function to cancel requests
This commit is contained in:
Daniel Avila 2023-04-08 23:19:29 -04:00
parent 5fbefa15ce
commit 88aea81288
5 changed files with 88 additions and 12 deletions

View file

@ -6,6 +6,22 @@ const { titleConvo, askClient } = require('../../../app/');
const { saveMessage, getConvoTitle, saveConvo, updateConvo, getConvo } = require('../../../models'); const { saveMessage, getConvoTitle, saveConvo, updateConvo, getConvo } = require('../../../models');
const { handleError, sendMessage, createOnProgress, handleText } = require('./handlers'); const { handleError, sendMessage, createOnProgress, handleText } = require('./handlers');
const abortControllers = new Map();
router.get('/abort', (req, res) => {
const requestId = req.query.requestId;
if (abortControllers.has(requestId)) {
const abortController = abortControllers.get(requestId);
abortController.abort();
abortControllers.delete(requestId);
console.log('Aborted request', requestId);
res.status(200).send('Aborted');
} else {
res.status(404).send('Request not found');
}
});
router.post('/', async (req, res) => { router.post('/', async (req, res) => {
const { const {
endpoint, endpoint,
@ -100,7 +116,14 @@ const ask = async ({
try { try {
const progressCallback = createOnProgress(); const progressCallback = createOnProgress();
const abortController = new AbortController(); const abortController = new AbortController();
res.on('close', () => abortController.abort()); const abortKey = userMessage.messageId;
abortControllers.set(abortKey, abortController);
res.on('close', () => {
console.log('stopped message, aborting');
abortController.abort();
return res.end();
});
let response = await askClient({ let response = await askClient({
text, text,
parentMessageId: userParentMessageId, parentMessageId: userParentMessageId,
@ -170,6 +193,7 @@ const ask = async ({
requestMessage: userMessage, requestMessage: userMessage,
responseMessage: responseMessage responseMessage: responseMessage
}); });
abortControllers.delete(abortKey);
res.end(); res.end();
if (userParentMessageId == '00000000-0000-0000-0000-000000000000') { if (userParentMessageId == '00000000-0000-0000-0000-000000000000') {

View file

@ -62,7 +62,8 @@ export default function TextChat({ isSearchView = false }) {
setText(''); setText('');
}; };
const handleStopGenerating = () => { const handleStopGenerating = (e) => {
e.preventDefault();
stopGenerating(); stopGenerating();
}; };

View file

@ -1,5 +1,5 @@
import { useEffect } from 'react'; import { useEffect, useState } from 'react';
import { useRecoilValue, useResetRecoilState, useSetRecoilState } from 'recoil'; import { useRecoilValue, useRecoilState, useResetRecoilState, useSetRecoilState } from 'recoil';
import { SSE } from '~/data-provider/sse.mjs'; import { SSE } from '~/data-provider/sse.mjs';
import createPayload from '~/data-provider/createPayload'; import createPayload from '~/data-provider/createPayload';
@ -11,6 +11,10 @@ export default function MessageHandler() {
const setMessages = useSetRecoilState(store.messages); const setMessages = useSetRecoilState(store.messages);
const setConversation = useSetRecoilState(store.conversation); const setConversation = useSetRecoilState(store.conversation);
const resetLatestMessage = useResetRecoilState(store.latestMessage); const resetLatestMessage = useResetRecoilState(store.latestMessage);
const [lastResponse, setLastResponse] = useRecoilState(store.lastResponse);
const setSubmission = useSetRecoilState(store.submission);
const [source, setSource] = useState(null);
const [abortKey, setAbortKey] = useState(null);
const { refreshConversations } = store.useConversations(); const { refreshConversations } = store.useConversations();
@ -45,7 +49,7 @@ export default function MessageHandler() {
const cancelHandler = (data, submission) => { const cancelHandler = (data, submission) => {
const { messages, message, initialResponse, isRegenerate = false } = submission; const { messages, message, initialResponse, isRegenerate = false } = submission;
if (isRegenerate) if (isRegenerate) {
setMessages([ setMessages([
...messages, ...messages,
{ {
@ -56,7 +60,7 @@ export default function MessageHandler() {
cancelled: true cancelled: true
} }
]); ]);
else } else {
setMessages([ setMessages([
...messages, ...messages,
message, message,
@ -65,9 +69,12 @@ export default function MessageHandler() {
text: data, text: data,
parentMessageId: message?.messageId, parentMessageId: message?.messageId,
messageId: message?.messageId + '_', messageId: message?.messageId + '_',
cancelled: true // cancelled: true
} }
]); ]);
setLastResponse('');
setSource(null);
}
}; };
const createdHandler = (data, submission) => { const createdHandler = (data, submission) => {
@ -149,7 +156,33 @@ export default function MessageHandler() {
if (submission === null) return; if (submission === null) return;
if (Object.keys(submission).length === 0) return; if (Object.keys(submission).length === 0) return;
let { message } = submission; let { message, cancel } = submission;
if (cancel && source) {
console.log('message aborted', submission);
source.close();
const { endpoint } = submission.conversation;
const latestMessage = lastResponse.replaceAll('█', '');
fetch(`/api/ask/${endpoint}/abort?requestId=${abortKey}`)
.then(response => {
if (response.ok) {
console.log('Request aborted');
} else {
console.error('Error aborting request');
}
})
.catch(error => {
console.error(error);
});
console.log('source closed, got this far');
cancelHandler(latestMessage, { ...submission, message });
setIsSubmitting(false);
setSubmission(null);
return;
}
// events.oncancel = () => cancelHandler(latestResponseText, { ...submission, message });
const { server, payload } = createPayload(submission); const { server, payload } = createPayload(submission);
@ -158,7 +191,9 @@ export default function MessageHandler() {
headers: { 'Content-Type': 'application/json' } headers: { 'Content-Type': 'application/json' }
}); });
let latestResponseText = ''; setSource(events);
// let latestResponseText = '';
events.onmessage = e => { events.onmessage = e => {
const data = JSON.parse(e.data); const data = JSON.parse(e.data);
@ -173,12 +208,14 @@ export default function MessageHandler() {
}; };
createdHandler(data, { ...submission, message }); createdHandler(data, { ...submission, message });
console.log('created', message); console.log('created', message);
setAbortKey(message?.messageId);
} else { } else {
let text = data.text || data.response; let text = data.text || data.response;
if (data.initial) console.log(data); if (data.initial) console.log(data);
if (data.message) { if (data.message) {
latestResponseText = text; // latestResponseText = text;
setLastResponse(text);
messageHandler(text, { ...submission, message }); messageHandler(text, { ...submission, message });
} }
// console.log('dataStream', data); // console.log('dataStream', data);
@ -187,7 +224,7 @@ export default function MessageHandler() {
events.onopen = () => console.log('connection is opened'); events.onopen = () => console.log('connection is opened');
events.oncancel = () => cancelHandler(latestResponseText, { ...submission, message }); // events.oncancel = () => cancelHandler(latestResponseText, { ...submission, message });
events.onerror = function (e) { events.onerror = function (e) {
console.log('error in opening conn.'); console.log('error in opening conn.');
@ -204,6 +241,7 @@ export default function MessageHandler() {
return () => { return () => {
const isCancelled = events.readyState <= 1; const isCancelled = events.readyState <= 1;
events.close(); events.close();
setSource(null);
if (isCancelled) { if (isCancelled) {
const e = new Event('cancel'); const e = new Event('cancel');
events.dispatchEvent(e); events.dispatchEvent(e);

View file

@ -31,7 +31,19 @@ const isSubmitting = atom({
default: false, default: false,
}); });
const lastResponse = atom({
key: "lastResponse",
default: '',
});
const source = atom({
key: "source",
default: null,
});
export default { export default {
submission, submission,
isSubmitting, isSubmitting,
lastResponse,
source,
}; };

View file

@ -138,7 +138,8 @@ const useMessageHandler = () => {
}; };
const stopGenerating = () => { const stopGenerating = () => {
setSubmission(null); // setSubmission(null);
setSubmission(prev => ({ ...prev, cancel: true }));
}; };
return { ask, regenerate, stopGenerating }; return { ask, regenerate, stopGenerating };