feat: add flush and stop methods

This commit is contained in:
2026-01-15 15:05:16 -03:00
parent f7558913ee
commit 30966ec81a
+82 -10
View File
@@ -17,6 +17,7 @@ export class StreamQueue {
private queue: Readable[] = [];
private isPlaying = false;
private mixer: MixedStream;
private currentStop: (() => void) | null = null;
constructor(mixer: MixedStream) {
this.mixer = mixer;
@@ -35,7 +36,10 @@ export class StreamQueue {
try {
if (nextStream) {
await this.mixer.playStream(nextStream);
const { completion, stop } = this.mixer.playStream(nextStream);
this.currentStop = stop;
await completion;
this.currentStop = null;
}
} catch (e) {
console.error('Queue error:', e);
@@ -47,6 +51,14 @@ export class StreamQueue {
public clear() {
this.queue = [];
if (this.currentStop) {
this.currentStop();
this.currentStop = null;
}
}
public flush() {
this.mixer.flush();
}
}
@@ -105,8 +117,12 @@ export class MixedStream {
return queue;
}
public playStream(source: Readable): Promise<void> {
return new Promise((resolve) => {
public playStream(source: Readable): {
completion: Promise<void>;
stop: () => void;
} {
let stopCallback: () => void = () => { };
const completion = new Promise<void>((resolve) => {
const mixerInput = this.mixer.createAudioInput({
channels: 2,
sampleRate: 48000,
@@ -134,27 +150,67 @@ export class MixedStream {
totalBytes += chunk.length;
});
let resolved = false;
const cleanup = () => {
if (resolved) return;
resolved = true;
try {
source.unpipe(transcoder);
source.destroy();
} catch (e) {
console.error('Error destroying source:', e);
}
try {
transcoder.unpipe(mixerInput);
transcoder.destroy();
} catch (e) {
console.error('Error destroying transcoder:', e);
}
try {
this.mixer.removeAudioinput(mixerInput);
} catch (e) {
console.error('Error removing audio input:', e);
}
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (typeof (this.mixer as any).removeAudioInput === 'function') {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this.mixer as any).removeAudioInput(mixerInput);
}
} catch (_) { /* ignore */ }
try {
mixerInput.destroy();
} catch (e) {
console.error('Error destroying mixer input:', e);
}
resolve();
};
stopCallback = cleanup;
transcoder.on('end', () => {
const durationMs =
(totalBytes / 192000) * 1000 * (1 + DURATION_MARGIN_PCT);
setTimeout(() => {
source.unpipe(transcoder);
transcoder.unpipe(mixerInput);
transcoder.destroy();
this.mixer.removeAudioinput(mixerInput);
resolve();
cleanup();
}, durationMs);
});
transcoder.on('error', (err) => {
console.error('Transcoder error:', err);
this.mixer.removeAudioinput(mixerInput);
resolve();
cleanup();
});
source.pipe(transcoder).pipe(mixerInput);
});
return { completion, stop: stopCallback };
}
public destroy(): void {
@@ -163,6 +219,22 @@ export class MixedStream {
this.mixer.destroy();
clearInterval(this.silenceInterval);
}
public flush(): void {
this.player.stop();
this.mixer.unpipe(this.output);
this.output.destroy();
this.output = new PassThrough({ highWaterMark: 1024 * 16 });
this.mixer.pipe(this.output);
const resource = createAudioResource(this.output, {
inputType: StreamType.Raw
});
this.player.play(resource);
}
}
export class AudioStreamManager {