feat: add audio streams, queue and manager
This commit is contained in:
@@ -0,0 +1,182 @@
|
||||
import { AudioPlayer, createAudioPlayer, createAudioResource, StreamType, VoiceConnection } from "@discordjs/voice";
|
||||
import { AudioMixer } from "node-audio-mixer";
|
||||
import { PassThrough, Readable } from "stream";
|
||||
|
||||
import prism from "prism-media";
|
||||
|
||||
export class StreamQueue {
|
||||
private queue: Readable[] = [];
|
||||
private isPlaying = false;
|
||||
private mixer: MixedStream;
|
||||
|
||||
constructor(mixer: MixedStream) {
|
||||
this.mixer = mixer;
|
||||
}
|
||||
|
||||
public enqueue(resource: Readable) {
|
||||
this.queue.push(resource);
|
||||
this.processQueue();
|
||||
}
|
||||
|
||||
private async processQueue() {
|
||||
if (this.isPlaying || this.queue.length === 0) return;
|
||||
|
||||
this.isPlaying = true;
|
||||
const nextStream = this.queue.shift();
|
||||
|
||||
try {
|
||||
if (nextStream) {
|
||||
await this.mixer.playStream(nextStream);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Queue error:", e);
|
||||
} finally {
|
||||
this.isPlaying = false;
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
public clear() {
|
||||
this.queue = [];
|
||||
}
|
||||
}
|
||||
|
||||
export class MixedStream {
|
||||
public readonly player: AudioPlayer;
|
||||
private mixer: AudioMixer;
|
||||
private output: PassThrough;
|
||||
private silenceInterval: NodeJS.Timeout;
|
||||
|
||||
private queues: Map<string, StreamQueue> = new Map();
|
||||
|
||||
public constructor() {
|
||||
this.player = createAudioPlayer();
|
||||
|
||||
this.mixer = new AudioMixer({
|
||||
channels: 2,
|
||||
bitDepth: 16,
|
||||
sampleRate: 48000,
|
||||
autoClose: false,
|
||||
generateSilence: false // does not work :<
|
||||
});
|
||||
|
||||
const silenceInput = this.mixer.createAudioInput({
|
||||
channels: 2,
|
||||
sampleRate: 48000,
|
||||
bitDepth: 16,
|
||||
volume: 100
|
||||
})
|
||||
|
||||
const chunk = Buffer.alloc(3840);
|
||||
this.silenceInterval = setInterval(() => {
|
||||
if (silenceInput.writable && silenceInput.writableLength < 3840 * 10) {
|
||||
silenceInput.write(chunk);
|
||||
}
|
||||
}, 20);
|
||||
|
||||
this.output = new PassThrough({ highWaterMark: 1024 * 16 });
|
||||
this.mixer.pipe(this.output);
|
||||
|
||||
const resource = createAudioResource(this.output, {
|
||||
inputType: StreamType.Raw
|
||||
});
|
||||
|
||||
this.player.play(resource);
|
||||
this.player.on('error', error => {
|
||||
console.error('Error: ', error.message);
|
||||
});
|
||||
}
|
||||
|
||||
public getQueue(name: string): StreamQueue {
|
||||
let queue = this.queues.get(name);
|
||||
if (!queue) {
|
||||
queue = new StreamQueue(this);
|
||||
this.queues.set(name, queue);
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
|
||||
public playStream(source: Readable): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const mixerInput = this.mixer.createAudioInput({
|
||||
channels: 2,
|
||||
sampleRate: 48000,
|
||||
bitDepth: 16,
|
||||
volume: 100,
|
||||
});
|
||||
|
||||
const transcoder = new prism.FFmpeg({
|
||||
args: [
|
||||
'-analyzeduration', '0',
|
||||
'-loglevel', '0',
|
||||
'-f', 's16le',
|
||||
'-ar', '48000',
|
||||
'-ac', '2',
|
||||
],
|
||||
});
|
||||
let totalBytes = 0;
|
||||
|
||||
transcoder.on('data', (chunk: Buffer) => {
|
||||
totalBytes += chunk.length;
|
||||
});
|
||||
|
||||
transcoder.on('end', () => {
|
||||
const durationMs = (totalBytes / 192000) * 1000;
|
||||
|
||||
setTimeout(() => {
|
||||
source.unpipe(transcoder);
|
||||
transcoder.unpipe(mixerInput);
|
||||
this.mixer.removeAudioinput(mixerInput);
|
||||
transcoder.destroy();
|
||||
resolve();
|
||||
}, durationMs);
|
||||
})
|
||||
|
||||
transcoder.on('error', (err) => {
|
||||
this.mixer.removeAudioinput(mixerInput);
|
||||
resolve();
|
||||
});
|
||||
|
||||
source.pipe(transcoder).pipe(mixerInput);
|
||||
});
|
||||
}
|
||||
|
||||
public destroy(): void {
|
||||
this.player.stop();
|
||||
this.output.destroy();
|
||||
this.mixer.destroy();
|
||||
clearInterval(this.silenceInterval);
|
||||
}
|
||||
}
|
||||
|
||||
export class AudioStreamManager {
|
||||
private streams = new WeakMap<VoiceConnection, MixedStream>();
|
||||
|
||||
public getOrCreateStream(conn: VoiceConnection): MixedStream {
|
||||
let stream = this.streams.get(conn);
|
||||
if (stream) return stream;
|
||||
|
||||
stream = new MixedStream();
|
||||
this.streams.set(conn, stream);
|
||||
conn.subscribe(stream.player);
|
||||
return stream;
|
||||
}
|
||||
|
||||
public destroyStream(conn: VoiceConnection): void {
|
||||
const stream = this.streams.get(conn);
|
||||
if (stream) {
|
||||
stream.destroy();
|
||||
this.streams.delete(conn);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
singleton logic
|
||||
*/
|
||||
static #instance: AudioStreamManager | null = null;
|
||||
|
||||
public static get get(): AudioStreamManager {
|
||||
if (!AudioStreamManager.#instance) AudioStreamManager.#instance = new AudioStreamManager();
|
||||
return AudioStreamManager.#instance;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user