LibreChat/api/app/clients/TextStream.js
Danny Avila ea1dd59ef4
refactor(api): Central Logging 📜 (#1348)
* WIP: initial logging changes
add several transports in ~/config/winston
omit messages in logs, truncate long strings
add short blurb in dotenv for debug logging
GoogleClient: using logger
OpenAIClient: using logger, handleOpenAIErrors
Adding typedef for payload message
bumped winston and using winston-daily-rotate-file
moved config for server paths to ~/config dir
Added `DEBUG_LOGGING=true` to .env.example

* WIP: Refactor logging statements in code

* WIP: Refactor logging statements and import configurations

* WIP: Refactor logging statements and import configurations

* refactor: broadcast Redis initialization message with `info` not `debug`

* refactor: complete Refactor logging statements and import configurations

* chore: delete unused tools

* fix: circular dependencies due to accessing logger

* refactor(handleText): handle booleans and write tests

* refactor: redact sensitive values, better formatting

* chore: improve log formatting, avoid passing strings to 2nd arg

* fix(ci): fix jest tests due to logger changes

* refactor(getAvailablePluginsController): cache plugins as they are static and avoids async addOpenAPISpecs call every time

* chore: update docs

* chore: update docs

* chore: create separate meiliSync logger, clean up logs to avoid being unnecessarily verbose

* chore: spread objects where they are commonly logged to allow string truncation

* chore: improve error log formatting
2023-12-14 07:49:27 -05:00

60 lines
1.6 KiB
JavaScript

const { Readable } = require('stream');
const { logger } = require('~/config');
class TextStream extends Readable {
constructor(text, options = {}) {
super(options);
this.text = text;
this.currentIndex = 0;
this.minChunkSize = options.minChunkSize ?? 2;
this.maxChunkSize = options.maxChunkSize ?? 4;
this.delay = options.delay ?? 20; // Time in milliseconds
}
_read() {
const { delay, minChunkSize, maxChunkSize } = this;
if (this.currentIndex < this.text.length) {
setTimeout(() => {
const remainingChars = this.text.length - this.currentIndex;
const chunkSize = Math.min(this.randomInt(minChunkSize, maxChunkSize + 1), remainingChars);
const chunk = this.text.slice(this.currentIndex, this.currentIndex + chunkSize);
this.push(chunk);
this.currentIndex += chunkSize;
}, delay);
} else {
this.push(null); // signal end of data
}
}
randomInt(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
}
async processTextStream(onProgressCallback) {
const streamPromise = new Promise((resolve, reject) => {
this.on('data', (chunk) => {
onProgressCallback(chunk.toString());
});
this.on('end', () => {
// logger.debug('[processTextStream] Stream ended');
resolve();
});
this.on('error', (err) => {
reject(err);
});
});
try {
await streamPromise;
} catch (err) {
logger.error('[processTextStream] Error in text stream:', err);
// Handle the error appropriately, e.g., return an error message or throw an error
}
}
}
module.exports = TextStream;