feat: Implement WebRTC messaging and audio handling in the WebRTC service

This commit is contained in:
Marco Beretta 2024-12-21 16:18:23 +01:00
parent cf4b73b5e3
commit 9a33292f88
No known key found for this signature in database
GPG key ID: D918033D8E74CC11
8 changed files with 674 additions and 137 deletions

View file

@ -1,12 +1,15 @@
const { WebSocketServer } = require('ws');
const fs = require('fs');
const path = require('path');
const { RTCPeerConnection } = require('wrtc');
module.exports.WebSocketService = class {
constructor(server) {
this.wss = new WebSocketServer({ server, path: '/ws' });
this.log('Server initialized');
this.clientAudioBuffers = new Map();
this.activeClients = new Map();
this.iceServers = [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' },
];
this.setupHandlers();
}
@ -17,7 +20,13 @@ module.exports.WebSocketService = class {
setupHandlers() {
this.wss.on('connection', (ws) => {
const clientId = Date.now().toString();
this.clientAudioBuffers.set(clientId, []);
this.activeClients.set(clientId, {
ws,
state: 'idle',
audioBuffer: [],
currentTranscription: '',
isProcessing: false,
});
this.log(`Client connected: ${clientId}`);
@ -29,42 +38,175 @@ module.exports.WebSocketService = class {
return;
}
if (message.type === 'audio-chunk') {
if (!this.clientAudioBuffers.has(clientId)) {
this.clientAudioBuffers.set(clientId, []);
}
this.clientAudioBuffers.get(clientId).push(message.data);
}
switch (message.type) {
case 'call-start':
this.handleCallStart(clientId);
break;
if (message.type === 'request-response') {
const filePath = path.join(__dirname, './assets/response.mp3');
const audioFile = fs.readFileSync(filePath);
ws.send(JSON.stringify({ type: 'audio-response', data: audioFile.toString('base64') }));
}
case 'audio-chunk':
await this.handleAudioChunk(clientId, message.data);
break;
if (message.type === 'call-ended') {
const allChunks = this.clientAudioBuffers.get(clientId);
this.writeAudioFile(clientId, allChunks);
this.clientAudioBuffers.delete(clientId);
case 'processing-start':
await this.processAudioStream(clientId);
break;
case 'audio-received':
this.confirmAudioReceived(clientId);
break;
case 'call-ended':
this.handleCallEnd(clientId);
break;
}
});
ws.on('close', () => {
this.handleCallEnd(clientId);
this.activeClients.delete(clientId);
this.log(`Client disconnected: ${clientId}`);
this.clientAudioBuffers.delete(clientId);
});
ws.on('error', (error) => {
this.log(`Error for client ${clientId}: ${error.message}`);
this.handleCallEnd(clientId);
});
});
}
writeAudioFile(clientId, base64Chunks) {
if (!base64Chunks || base64Chunks.length === 0) {
async handleCallStart(clientId) {
const client = this.activeClients.get(clientId);
if (!client) {
return;
}
const filePath = path.join(__dirname, `recorded_${clientId}.webm`);
const buffer = Buffer.concat(
base64Chunks.map((chunk) => Buffer.from(chunk.split(',')[1], 'base64')),
try {
client.state = 'active';
client.audioBuffer = [];
client.currentTranscription = '';
client.isProcessing = false;
const peerConnection = new RTCPeerConnection({
iceServers: this.iceServers,
sdpSemantics: 'unified-plan',
});
client.peerConnection = peerConnection;
client.dataChannel = peerConnection.createDataChannel('audio', {
ordered: true,
maxRetransmits: 3,
});
client.dataChannel.onopen = () => this.log(`Data channel opened for ${clientId}`);
client.dataChannel.onmessage = async (event) => {
await this.handleAudioChunk(clientId, event.data);
};
peerConnection.onicecandidate = (event) => {
if (event.candidate) {
client.ws.send(
JSON.stringify({
type: 'ice-candidate',
candidate: event.candidate,
}),
);
}
};
peerConnection.onnegotiationneeded = async () => {
try {
const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);
client.ws.send(
JSON.stringify({
type: 'webrtc-offer',
sdp: peerConnection.localDescription,
}),
);
} catch (error) {
this.log(`Negotiation failed for ${clientId}: ${error}`);
}
};
this.log(`Call started for client ${clientId}`);
} catch (error) {
this.log(`Error starting call for ${clientId}: ${error.message}`);
this.handleCallEnd(clientId);
}
}
async handleAudioChunk(clientId, data) {
const client = this.activeClients.get(clientId);
if (!client || client.state !== 'active') {
return;
}
client.audioBuffer.push(data);
client.ws.send(JSON.stringify({ type: 'audio-received' }));
}
async processAudioStream(clientId) {
const client = this.activeClients.get(clientId);
if (!client || client.state !== 'active' || client.isProcessing) {
return;
}
client.isProcessing = true;
try {
// Process transcription
client.ws.send(
JSON.stringify({
type: 'transcription',
data: 'Processing audio...',
}),
);
// Stream LLM response
client.ws.send(
JSON.stringify({
type: 'llm-response',
data: 'Processing response...',
}),
);
// Stream TTS chunks
client.ws.send(
JSON.stringify({
type: 'tts-chunk',
data: 'audio_data_here',
}),
);
} catch (error) {
this.log(`Processing error for client ${clientId}: ${error.message}`);
} finally {
client.isProcessing = false;
client.audioBuffer = [];
}
}
confirmAudioReceived(clientId) {
const client = this.activeClients.get(clientId);
if (!client) {
return;
}
client.ws.send(
JSON.stringify({
type: 'audio-received',
data: null,
}),
);
fs.writeFileSync(filePath, buffer);
this.log(`Saved audio to ${filePath}`);
}
handleCallEnd(clientId) {
const client = this.activeClients.get(clientId);
if (!client) {
return;
}
client.state = 'idle';
client.audioBuffer = [];
client.currentTranscription = '';
}
};