InternNav-Eval-Demo / simulation.py
jandan138's picture
fix bug
470759f
raw
history blame
7.17 kB
# simulation.py
# 仿真与视频相关
import os
import time
import uuid
import cv2
import numpy as np
from typing import List
import gradio as gr
from backend_api import get_task_status
from oss_utils import list_oss_files, download_oss_file, get_user_tmp_dir
def stream_simulation_results(result_folder: str, task_id: str, request: gr.Request, fps: int = 6):
"""
流式输出仿真结果,从OSS读取图片
参数:
result_folder: OSS上包含生成图片的文件夹路径
task_id: 后端任务ID用于状态查询
request: Gradio请求对象
fps: 输出视频的帧率
生成:
生成的视频文件路径 (分段输出)
"""
# 初始化变量
image_folder = os.path.join(result_folder, "images")
frame_buffer: List[np.ndarray] = []
frames_per_segment = fps * 2 # 每2秒输出一段
processed_files = set()
width, height = 0, 0
last_status_check = 0
status_check_interval = 5 # 每5秒检查一次后端状态
max_time = 240
# 创建临时目录存储下载的图片
user_dir = get_user_tmp_dir(request.session_hash)
local_image_dir = os.path.join(user_dir, task_id, "images")
os.makedirs(local_image_dir, exist_ok=True)
while max_time > 0:
max_time -= 1
current_time = time.time()
# 定期检查后端状态
if current_time - last_status_check > status_check_interval:
status = get_task_status(task_id)
print(f"Session {request.session_hash}, status: {status}")
if status.get("status") == "completed":
# 确保处理完所有已生成的图片
process_remaining_oss_images(image_folder, local_image_dir, processed_files, frame_buffer)
if frame_buffer:
yield create_video_segment(frame_buffer, fps, width, height, request)
break
elif status.get("status") == "failed":
raise gr.Error(f"任务执行失败: {status.get('result', '未知错误')}")
elif status.get("status") == "terminated":
break
last_status_check = current_time
# 从OSS获取文件列表
try:
oss_files = list_oss_files(image_folder)
new_files = [f for f in oss_files if f not in processed_files]
has_new_frames = False
for oss_path in new_files:
try:
# 下载文件到本地
filename = os.path.basename(oss_path)
local_path = os.path.join(local_image_dir, filename)
download_oss_file(oss_path, local_path)
# 读取图片
frame = cv2.imread(local_path)
if frame is not None:
if width == 0: # 第一次获取图像尺寸
height, width = frame.shape[:2]
frame_buffer.append(frame)
processed_files.add(oss_path)
has_new_frames = True
except Exception as e:
print(f"Error processing {oss_path}: {e}")
# 如果有新帧且积累够指定帧数,输出视频片段
if has_new_frames and len(frame_buffer) >= frames_per_segment:
segment_frames = frame_buffer[:frames_per_segment]
frame_buffer = frame_buffer[frames_per_segment:]
yield create_video_segment(segment_frames, fps, width, height, request)
except Exception as e:
print(f"Error accessing OSS: {e}")
time.sleep(1) # 避免过于频繁检查
if max_time <= 0:
raise gr.Error("timeout 240s")
def create_video_segment(frames: List[np.ndarray], fps: int, width: int, height: int, request: gr.Request) -> str:
"""创建视频片段"""
user_dir = get_user_tmp_dir(request.session_hash)
video_chunk_dir = os.path.join(user_dir, "video_chunks")
os.makedirs(video_chunk_dir, exist_ok=True)
segment_name = os.path.join(video_chunk_dir, f"output_{uuid.uuid4()}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(segment_name, fourcc, fps, (width, height))
for frame in frames:
out.write(frame)
out.release()
return segment_name
def process_remaining_oss_images(oss_folder: str, local_dir: str, processed_files: set, frame_buffer: List[np.ndarray]):
"""处理OSS上剩余的图片"""
try:
oss_files = list_oss_files(oss_folder)
new_files = [f for f in oss_files if f not in processed_files]
for oss_path in new_files:
try:
# 下载文件到本地
filename = os.path.basename(oss_path)
local_path = os.path.join(local_dir, filename)
download_oss_file(oss_path, local_path)
# 读取图片
frame = cv2.imread(local_path)
if frame is not None:
frame_buffer.append(frame)
processed_files.add(oss_path)
except Exception as e:
print(f"Error processing remaining {oss_path}: {e}")
except Exception as e:
print(f"Error accessing OSS for remaining files: {e}")
def convert_to_h264(video_path):
"""
将视频转换为 H.264 编码的 MP4 格式
生成新文件路径在原路径基础上添加 _h264 后缀
"""
import shutil
import subprocess
base, ext = os.path.splitext(video_path)
video_path_h264 = f"{base}_h264.mp4"
# 查找ffmpeg
ffmpeg_bin = shutil.which("ffmpeg")
if ffmpeg_bin is None:
# 尝试常见的安装路径
possible_paths = [
"/root/anaconda3/envs/gradio/bin/ffmpeg",
"/usr/bin/ffmpeg",
"/usr/local/bin/ffmpeg"
]
for path in possible_paths:
if os.path.exists(path):
ffmpeg_bin = path
break
if ffmpeg_bin is None:
raise RuntimeError("❌ 找不到 ffmpeg,请确保其已安装并在 PATH 中")
ffmpeg_cmd = [
ffmpeg_bin,
"-i", video_path,
"-c:v", "libx264",
"-preset", "slow",
"-crf", "23",
"-c:a", "aac",
"-movflags", "+faststart",
video_path_h264
]
try:
result = subprocess.run(ffmpeg_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if not os.path.exists(video_path_h264):
raise FileNotFoundError(f"⚠️ H.264 文件未生成: {video_path_h264}")
return video_path_h264
except subprocess.CalledProcessError as e:
raise gr.Error(f"FFmpeg 转换失败: {e.stderr}")
except Exception as e:
raise gr.Error(f"转换过程中发生错误: {str(e)}")