From 30966ec81a7fb6def8577fc8e070519fb1113eab Mon Sep 17 00:00:00 2001 From: neru Date: Thu, 15 Jan 2026 15:05:16 -0300 Subject: [PATCH] feat: add flush and stop methods --- src/modules/audioStreams.ts | 92 +++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 10 deletions(-) diff --git a/src/modules/audioStreams.ts b/src/modules/audioStreams.ts index 271d043..dfcef87 100644 --- a/src/modules/audioStreams.ts +++ b/src/modules/audioStreams.ts @@ -17,6 +17,7 @@ export class StreamQueue { private queue: Readable[] = []; private isPlaying = false; private mixer: MixedStream; + private currentStop: (() => void) | null = null; constructor(mixer: MixedStream) { this.mixer = mixer; @@ -35,7 +36,10 @@ export class StreamQueue { try { if (nextStream) { - await this.mixer.playStream(nextStream); + const { completion, stop } = this.mixer.playStream(nextStream); + this.currentStop = stop; + await completion; + this.currentStop = null; } } catch (e) { console.error('Queue error:', e); @@ -47,6 +51,14 @@ export class StreamQueue { public clear() { this.queue = []; + if (this.currentStop) { + this.currentStop(); + this.currentStop = null; + } + } + + public flush() { + this.mixer.flush(); } } @@ -105,8 +117,12 @@ export class MixedStream { return queue; } - public playStream(source: Readable): Promise { - return new Promise((resolve) => { + public playStream(source: Readable): { + completion: Promise; + stop: () => void; + } { + let stopCallback: () => void = () => { }; + const completion = new Promise((resolve) => { const mixerInput = this.mixer.createAudioInput({ channels: 2, sampleRate: 48000, @@ -134,27 +150,67 @@ export class MixedStream { totalBytes += chunk.length; }); + let resolved = false; + const cleanup = () => { + if (resolved) return; + resolved = true; + try { + source.unpipe(transcoder); + source.destroy(); + } catch (e) { + console.error('Error destroying source:', e); + } + + try { + transcoder.unpipe(mixerInput); + transcoder.destroy(); + } catch (e) { + console.error('Error destroying transcoder:', e); + } + + try { + this.mixer.removeAudioinput(mixerInput); + } catch (e) { + console.error('Error removing audio input:', e); + } + + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if (typeof (this.mixer as any).removeAudioInput === 'function') { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (this.mixer as any).removeAudioInput(mixerInput); + } + } catch (_) { /* ignore */ } + + try { + mixerInput.destroy(); + } catch (e) { + console.error('Error destroying mixer input:', e); + } + + resolve(); + }; + + stopCallback = cleanup; + transcoder.on('end', () => { const durationMs = (totalBytes / 192000) * 1000 * (1 + DURATION_MARGIN_PCT); setTimeout(() => { - source.unpipe(transcoder); - transcoder.unpipe(mixerInput); - transcoder.destroy(); - this.mixer.removeAudioinput(mixerInput); - resolve(); + cleanup(); }, durationMs); }); transcoder.on('error', (err) => { console.error('Transcoder error:', err); - this.mixer.removeAudioinput(mixerInput); - resolve(); + cleanup(); }); source.pipe(transcoder).pipe(mixerInput); }); + + return { completion, stop: stopCallback }; } public destroy(): void { @@ -163,6 +219,22 @@ export class MixedStream { this.mixer.destroy(); clearInterval(this.silenceInterval); } + + public flush(): void { + this.player.stop(); + + this.mixer.unpipe(this.output); + this.output.destroy(); + + this.output = new PassThrough({ highWaterMark: 1024 * 16 }); + this.mixer.pipe(this.output); + + const resource = createAudioResource(this.output, { + inputType: StreamType.Raw + }); + + this.player.play(resource); + } } export class AudioStreamManager {