EchoX / app.py
tzzte's picture
Update app.py
c5f1fa0 verified
raw
history blame
4.13 kB
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pip==24.0"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "omegaconf==2.0.6"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "git+https://github.com/facebookresearch/fairseq.git@v0.12.2"])
import gradio as gr
import os
import torch
import librosa
import soundfile as sf
import tempfile
import spaces # ZeroGPU requirement
# 导入你的模块
import Echox_copy_stream as Echox
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 全局变量
_MODEL_ON_CUDA = False
inference_model = None
def init_model():
"""在CPU上初始化模型"""
global inference_model
if inference_model is None:
inference_model = Echox.EchoxAssistant()
return inference_model
def process_audio_input(audio):
"""处理音频输入"""
if audio is None:
return None
try:
# 如果是文件路径,直接返回
if isinstance(audio, str):
return audio
# 如果是numpy数组格式 (sr, data)
if isinstance(audio, tuple):
sr, y = audio
if y.ndim > 1:
y = y[:, 0] # 只保留第一个声道
else:
# 如果直接是数组
y = audio
sr = 16000 # 默认采样率
# 保存为临时文件
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
sf.write(tmp_file.name, y, sr)
return tmp_file.name
except Exception as e:
print(f"Error processing audio: {e}")
return None
@spaces.GPU(duration=180) # 使用ZeroGPU,3分钟超时
def process_audio_text(audio):
"""主要处理函数"""
global _MODEL_ON_CUDA, inference_model
# 初始化模型(如果还没初始化)
if inference_model is None:
init_model()
# 首次使用GPU时移动模型
if not _MODEL_ON_CUDA:
try:
# 将模型移动到GPU
if hasattr(inference_model, 'model'):
inference_model.model = inference_model.model.to("cuda")
if hasattr(inference_model, 'unit_translator'):
inference_model.unit_translator = inference_model.unit_translator.to("cuda")
inference_model.device = "cuda"
_MODEL_ON_CUDA = True
print("Model moved to GPU")
except Exception as e:
print(f"Error moving model to GPU: {e}")
# 处理音频输入
audio_path = process_audio_input(audio)
text = ""
tmp = [{
"conversations": [
{
"from": "user",
"value": text,
"audio": audio_path
}
]
}]
accumulated_text = ""
try:
for text_response, audio_data in inference_model._inference(tmp):
if text_response:
accumulated_text = text_response
if audio_data is not None:
sr, audio_array = audio_data
yield accumulated_text, (sr, audio_array)
else:
yield accumulated_text, None
except Exception as e:
yield f"Error: {str(e)}", None
finally:
# 清理临时文件
if audio_path and audio_path != audio and os.path.exists(audio_path):
try:
os.unlink(audio_path)
except:
pass
# 初始化模型(在CPU上)
init_model()
if __name__ == "__main__":
examples = [
["./show_case/1.wav"],
["./show_case/2.wav"],
]
iface = gr.Interface(
fn=process_audio_text,
inputs=[
gr.Audio(type="filepath", label="Upload Audio")
],
outputs=[
gr.Audio(label="Streamed Audio", streaming=True, autoplay=True),
gr.Textbox(label="Model output")
],
examples=examples,
live=False,
allow_flagging="never"
)
iface.launch(server_name="0.0.0.0", server_port=7860, share=True)