// /common/StreamingContext.js import BlockingQueue from './BlockingQueue.js'; // 使用我们之前适配过的版本 /** * 适配uniapp小程序环境的音频流处理上下文。 * 它负责解码Opus流,将其缓冲为PCM数据,并在需要时转换为WAV文件进行播放。 */ export class StreamingContext { /** * @param {object} opusDecoder - 已初始化的Opus解码器实例 * @param {InnerAudioContext} innerAudioContext - uniapp创建的音频播放器实例 * @param {number} sampleRate - 采样率 (e.g., 16000) * @param {number} channels - 声道数 (e.g., 1) */ constructor(opusDecoder, innerAudioContext, sampleRate, channels) { this.opusDecoder = opusDecoder; this.innerAudioContext = innerAudioContext; this.sampleRate = sampleRate; this.channels = channels; // 输入队列,用于接收来自WebSocket的原始Opus数据包 this.inputQueue = new BlockingQueue(); // 内部PCM缓冲,存储解码后的Int16数据块 this._pcmBuffer = []; this._totalPcmSamples = 0; // 状态标记 this._isDecoding = false; this._isStopped = false; } /** * 外部调用:向队列中添加待解码的原始Opus数据。 * @param {Uint8Array} opusFrame - 从WebSocket收到的Opus数据帧 */ pushOpusFrame(opusFrame) { if (opusFrame && opusFrame.length > 0) { this.inputQueue.enqueue(opusFrame); } } /** * 启动解码循环。这个循环会持续运行,直到被stop()。 * 它会阻塞地等待`inputQueue`中的数据。 */ async startDecodingLoop() { if (this._isDecoding) return; // 防止重复启动 if (!this.opusDecoder) { console.log('Opus解码器未初始化,无法启动解码循环', 'error'); return; } this._isDecoding = true; this._isStopped = false; console.log('Opus解码循环已启动', 'info'); while (!this._isStopped) { try { // 阻塞式地等待数据帧,可以一次性取出多个 const framesToDecode = await this.inputQueue.dequeue(1, Infinity); if (this._isStopped) break; // 检查在等待后是否被停止 for (const frame of framesToDecode) { const pcmFrame = this.opusDecoder.decode(frame); // 假设返回 Int16Array if (pcmFrame && pcmFrame.length > 0) { this._pcmBuffer.push(pcmFrame); this._totalPcmSamples += pcmFrame.length; } } } catch (error) { console.log(`解码循环中出错: ${error.message}`, 'error'); } } this._isDecoding = false; console.log('Opus解码循环已停止', 'info'); } /** * 外部调用:将当前所有缓冲的PCM数据转换成WAV文件并播放。 */ async playBufferedAudio() { if (this.innerAudioContext.paused === false) { console.log('播放器正在播放,本次播放请求被忽略', 'warning'); return; } if (this._pcmBuffer.length === 0) { console.log('PCM缓冲区为空,无需播放', 'info'); return; } // 1. 合并所有PCM数据块 const totalSamples = this._totalPcmSamples; const fullPcmData = new Int16Array(totalSamples); let offset = 0; for (const pcmChunk of this._pcmBuffer) { fullPcmData.set(pcmChunk, offset); offset += pcmChunk.length; } // 清空缓冲区以便下一次会话 this.reset(); console.log(`准备播放,总样本数: ${totalSamples}`, 'info'); // 2. 转换为WAV格式 (ArrayBuffer) const wavData = this._pcmToWav(fullPcmData, this.sampleRate, this.channels); // 3. 写入临时文件并播放 const fs = wx.getFileSystemManager(); const tempFilePath = `${wx.env.USER_DATA_PATH}/temp_audio_${Date.now()}.wav`; fs.writeFile({ filePath: tempFilePath, data: wavData, encoding: 'binary', success: () => { console.log(`WAV文件写入成功: ${tempFilePath}`, 'success'); this.innerAudioContext.src = tempFilePath; this.innerAudioContext.play(); }, fail: (err) => { console.log(`WAV文件写入失败: ${JSON.stringify(err)}`, 'error'); } }); } /** * 重置上下文状态,清空所有缓冲区,为下一次语音会话做准备。 */ reset() { this._pcmBuffer = []; this._totalPcmSamples = 0; // 注意:不要重置 inputQueue,因为它可能有预读的数据 console.log('StreamingContext已重置', 'info'); } /** * 停止解码循环并清理资源。 */ stop() { this._isStopped = true; // 通过入队一个null值来唤醒可能在dequeue上阻塞的循环,使其能检查到_isStopped标志 this.inputQueue.enqueue(null); } /** * [核心辅助函数] 将Int16 PCM数据转换为WAV文件格式的ArrayBuffer * @param {Int16Array} pcmData - 原始PCM数据 * @param {number} sampleRate - 采样率 * @param {number} channels - 声道数 * @returns {ArrayBuffer} */ _pcmToWav(pcmData, sampleRate, channels) { const bitsPerSample = 16; const dataSize = pcmData.length * (bitsPerSample / 8); const fileSize = 44 + dataSize; const buffer = new ArrayBuffer(fileSize); const view = new DataView(buffer); // 写入WAV文件头 // RIFF chunk descriptor this._writeString(view, 0, 'RIFF'); view.setUint32(4, fileSize - 8, true); // fileSize this._writeString(view, 8, 'WAVE'); // "fmt " sub-chunk this._writeString(view, 12, 'fmt '); view.setUint32(16, 16, true); // chunkSize view.setUint16(20, 1, true); // audioFormat (1 for PCM) view.setUint16(22, channels, true); // numChannels view.setUint32(24, sampleRate, true); // sampleRate view.setUint32(28, sampleRate * channels * (bitsPerSample / 8), true); // byteRate view.setUint16(32, channels * (bitsPerSample / 8), true); // blockAlign view.setUint16(34, bitsPerSample, true); // bitsPerSample // "data" sub-chunk this._writeString(view, 36, 'data'); view.setUint32(40, dataSize, true); // 写入PCM数据 for (let i = 0; i < pcmData.length; i++) { view.setInt16(44 + i * 2, pcmData[i], true); } return buffer; } _writeString(view, offset, string) { for (let i = 0; i < string.length; i++) { view.setUint8(offset + i, string.charCodeAt(i)); } } } /** * 创建StreamingContext实例的工厂函数 * @param {object} opusDecoder * @param {InnerAudioContext} innerAudioContext * @param {number} sampleRate * @param {number} channels * @returns {StreamingContext} */ export function createStreamingContext(opusDecoder, innerAudioContext, sampleRate, channels) { return new StreamingContext(opusDecoder, innerAudioContext, sampleRate, channels); }