Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| # simulation.py | |
| # 仿真与视频相关 | |
| import os | |
| import time | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| from typing import List | |
| import gradio as gr | |
| from backend_api import get_task_status | |
| from oss_utils import list_oss_files, download_oss_file, get_user_tmp_dir, test_oss_access, clean_oss_result_path | |
| def stream_simulation_results(result_folder: str, task_id: str, request: gr.Request, fps: int = 6): | |
| """ | |
| 流式输出仿真结果,从OSS读取图片 | |
| 参数: | |
| result_folder: OSS上包含生成图片的文件夹路径 | |
| task_id: 后端任务ID用于状态查询 | |
| request: Gradio请求对象 | |
| fps: 输出视频的帧率 | |
| 生成: | |
| 生成的视频文件路径 (分段输出) | |
| """ | |
| # 初始化变量 | |
| image_folder = os.path.join(result_folder, "images") | |
| frame_buffer: List[np.ndarray] = [] | |
| frames_per_segment = fps * 2 # 每2秒输出一段 | |
| processed_files = set() | |
| width, height = 0, 0 | |
| last_status_check = 0 | |
| status_check_interval = 5 # 每5秒检查一次后端状态 | |
| max_time = 240 | |
| # 创建临时目录存储下载的图片 | |
| user_dir = get_user_tmp_dir(request.session_hash) | |
| local_image_dir = os.path.join(user_dir, task_id, "images") | |
| os.makedirs(local_image_dir, exist_ok=True) | |
| while max_time > 0: | |
| max_time -= 1 | |
| current_time = time.time() | |
| # 定期检查后端状态 | |
| if current_time - last_status_check > status_check_interval: | |
| status = get_task_status(task_id) | |
| print(f"Session {request.session_hash}, status: {status}") | |
| if status.get("status") == "completed": | |
| # 确保处理完所有已生成的图片 | |
| process_remaining_oss_images(image_folder, local_image_dir, processed_files, frame_buffer) | |
| if frame_buffer: | |
| yield create_video_segment(frame_buffer, fps, width, height, request) | |
| break | |
| elif status.get("status") == "failed": | |
| raise gr.Error(f"任务执行失败: {status.get('result', '未知错误')}") | |
| elif status.get("status") == "terminated": | |
| break | |
| last_status_check = current_time | |
| # 从OSS获取文件列表 | |
| try: | |
| oss_files = list_oss_files(image_folder) | |
| new_files = [f for f in oss_files if f not in processed_files] | |
| has_new_frames = False | |
| for oss_path in new_files: | |
| try: | |
| # 下载文件到本地 | |
| filename = os.path.basename(oss_path) | |
| local_path = os.path.join(local_image_dir, filename) | |
| download_oss_file(oss_path, local_path) | |
| # 读取图片 | |
| frame = cv2.imread(local_path) | |
| if frame is not None: | |
| if width == 0: # 第一次获取图像尺寸 | |
| height, width = frame.shape[:2] | |
| frame_buffer.append(frame) | |
| processed_files.add(oss_path) | |
| has_new_frames = True | |
| except Exception as e: | |
| print(f"Error processing {oss_path}: {e}") | |
| # 如果有新帧且积累够指定帧数,输出视频片段 | |
| if has_new_frames and len(frame_buffer) >= frames_per_segment: | |
| segment_frames = frame_buffer[:frames_per_segment] | |
| frame_buffer = frame_buffer[frames_per_segment:] | |
| yield create_video_segment(segment_frames, fps, width, height, request) | |
| except Exception as e: | |
| print(f"Error accessing OSS: {e}") | |
| time.sleep(1) # 避免过于频繁检查 | |
| if max_time <= 0: | |
| raise gr.Error("timeout 240s") | |
| def create_video_segment(frames: List[np.ndarray], fps: int, width: int, height: int, request: gr.Request) -> str: | |
| """创建视频片段""" | |
| user_dir = get_user_tmp_dir(request.session_hash) | |
| video_chunk_dir = os.path.join(user_dir, "video_chunks") | |
| os.makedirs(video_chunk_dir, exist_ok=True) | |
| segment_name = os.path.join(video_chunk_dir, f"output_{uuid.uuid4()}.mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(segment_name, fourcc, fps, (width, height)) | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| return segment_name | |
| def process_remaining_oss_images(oss_folder: str, local_dir: str, processed_files: set, frame_buffer: List[np.ndarray]): | |
| """处理OSS上剩余的图片""" | |
| try: | |
| oss_files = list_oss_files(oss_folder) | |
| new_files = [f for f in oss_files if f not in processed_files] | |
| for oss_path in new_files: | |
| try: | |
| # 下载文件到本地 | |
| filename = os.path.basename(oss_path) | |
| local_path = os.path.join(local_dir, filename) | |
| download_oss_file(oss_path, local_path) | |
| # 读取图片 | |
| frame = cv2.imread(local_path) | |
| if frame is not None: | |
| frame_buffer.append(frame) | |
| processed_files.add(oss_path) | |
| except Exception as e: | |
| print(f"Error processing remaining {oss_path}: {e}") | |
| except Exception as e: | |
| print(f"Error accessing OSS for remaining files: {e}") | |
| def convert_to_h264(video_path): | |
| """ | |
| 将视频转换为 H.264 编码的 MP4 格式 | |
| 生成新文件路径在原路径基础上添加 _h264 后缀 | |
| """ | |
| import shutil | |
| import subprocess | |
| base, ext = os.path.splitext(video_path) | |
| video_path_h264 = f"{base}_h264.mp4" | |
| # 查找ffmpeg | |
| ffmpeg_bin = shutil.which("ffmpeg") | |
| if ffmpeg_bin is None: | |
| # 尝试常见的安装路径 | |
| possible_paths = [ | |
| "/root/anaconda3/envs/gradio/bin/ffmpeg", | |
| "/usr/bin/ffmpeg", | |
| "/usr/local/bin/ffmpeg" | |
| ] | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| ffmpeg_bin = path | |
| break | |
| if ffmpeg_bin is None: | |
| raise RuntimeError("❌ 找不到 ffmpeg,请确保其已安装并在 PATH 中") | |
| ffmpeg_cmd = [ | |
| ffmpeg_bin, | |
| "-i", video_path, | |
| "-c:v", "libx264", | |
| "-preset", "slow", | |
| "-crf", "23", | |
| "-c:a", "aac", | |
| "-movflags", "+faststart", | |
| video_path_h264 | |
| ] | |
| try: | |
| result = subprocess.run(ffmpeg_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if not os.path.exists(video_path_h264): | |
| raise FileNotFoundError(f"⚠️ H.264 文件未生成: {video_path_h264}") | |
| return video_path_h264 | |
| except subprocess.CalledProcessError as e: | |
| raise gr.Error(f"FFmpeg 转换失败: {e.stderr}") | |
| except Exception as e: | |
| raise gr.Error(f"转换过程中发生错误: {str(e)}") | |
| def create_final_video_from_oss_images(result_folder: str, task_id: str, request: gr.Request, fps: int = 6) -> str: | |
| """ | |
| 从OSS上的所有图片拼接成最终完整视频 | |
| 参数: | |
| result_folder: OSS上的结果文件夹路径 | |
| task_id: 任务ID | |
| request: Gradio请求对象 | |
| fps: 视频帧率 (6帧每秒) | |
| 返回: | |
| 最终视频文件路径 | |
| """ | |
| # 使用统一的路径清理函数 | |
| cleaned_result_folder = clean_oss_result_path(result_folder, task_id) | |
| # 获取图片文件夹路径 | |
| image_folder = os.path.join(cleaned_result_folder, "images").replace('\\', '/') | |
| user_dir = get_user_tmp_dir(request.session_hash) | |
| local_image_dir = os.path.join(user_dir, task_id, "final_images") | |
| os.makedirs(local_image_dir, exist_ok=True) | |
| # 添加调试信息 | |
| print(f"🔍 DEBUG: original result_folder = {result_folder}") | |
| print(f"🔍 DEBUG: cleaned_result_folder = {cleaned_result_folder}") | |
| print(f"🔍 DEBUG: task_id = {task_id}") | |
| print(f"🔍 DEBUG: image_folder = {image_folder}") | |
| print(f"🔍 DEBUG: user_dir = {user_dir}") | |
| print(f"🔍 DEBUG: local_image_dir = {local_image_dir}") | |
| try: | |
| # 首先测试OSS访问 | |
| print(f"🔍 DEBUG: Testing OSS access for task {task_id}") | |
| test_oss_access(task_id) | |
| # 1. 获取OSS上的所有图片文件 | |
| print(f"🔍 DEBUG: Listing files in OSS folder: {image_folder}") | |
| oss_files = list_oss_files(image_folder) | |
| print(f"🔍 DEBUG: Total files found in OSS: {len(oss_files)}") | |
| print(f"🔍 DEBUG: First 10 files: {oss_files[:10]}") | |
| image_files = [f for f in oss_files if f.lower().endswith(('.png', '.jpg', '.jpeg'))] | |
| print(f"🔍 DEBUG: Image files found: {len(image_files)}") | |
| print(f"🔍 DEBUG: First 5 image files: {image_files[:5]}") | |
| if not image_files: | |
| # 如果没有找到图片,尝试直接在cleaned_result_folder中查找 | |
| print(f"🔍 DEBUG: No images in {image_folder}, trying {cleaned_result_folder}") | |
| oss_files_direct = list_oss_files(cleaned_result_folder) | |
| print(f"🔍 DEBUG: Files in result_folder: {len(oss_files_direct)}") | |
| print(f"🔍 DEBUG: Direct files: {oss_files_direct[:10]}") | |
| # 查找所有子目录中的图片 | |
| all_image_files = [f for f in oss_files_direct if f.lower().endswith(('.png', '.jpg', '.jpeg'))] | |
| print(f"🔍 DEBUG: All image files in result_folder: {len(all_image_files)}") | |
| # 尝试不同的路径组合 | |
| alternative_paths = [ | |
| f"gradio_demo/tasks/{task_id}/images", | |
| f"gradio_demo/tasks/{task_id}", | |
| f"tasks/{task_id}/images", | |
| f"tasks/{task_id}", | |
| # 根据您确认的路径格式添加 | |
| f"gradio_demo/tasks/{task_id}/image", # 可能是单数形式 | |
| f"gradio_demo/tasks/{task_id}/imgs", # 可能是缩写 | |
| ] | |
| for alt_path in alternative_paths: | |
| print(f"🔍 DEBUG: Trying alternative path: {alt_path}") | |
| alt_files = list_oss_files(alt_path) | |
| alt_images = [f for f in alt_files if f.lower().endswith(('.png', '.jpg', '.jpeg'))] | |
| print(f"🔍 DEBUG: Found {len(alt_images)} images in {alt_path}") | |
| if alt_images: | |
| all_image_files = alt_images | |
| print(f"🔍 DEBUG: Using images from alternative path: {alt_path}") | |
| break | |
| if all_image_files: | |
| image_files = all_image_files | |
| print(f"🔍 DEBUG: Using image files from alternative search: {len(image_files)}") | |
| else: | |
| raise gr.Error("No images found in OSS for final video creation") | |
| # 2. 按文件名排序确保时间顺序正确 | |
| image_files.sort(key=lambda x: os.path.splitext(os.path.basename(x))[0]) | |
| print(f"🔍 DEBUG: Sorted image files (first 5): {[os.path.basename(f) for f in image_files[:5]]}") | |
| # 3. 下载所有图片到本地 | |
| frames = [] | |
| width, height = 0, 0 | |
| print(f"🔍 DEBUG: Starting to download {len(image_files)} images...") | |
| for i, oss_path in enumerate(image_files): | |
| try: | |
| filename = os.path.basename(oss_path) | |
| local_path = os.path.join(local_image_dir, filename) | |
| download_oss_file(oss_path, local_path) | |
| # 读取图片 | |
| frame = cv2.imread(local_path) | |
| if frame is not None: | |
| if width == 0: # 第一次获取图像尺寸 | |
| height, width = frame.shape[:2] | |
| print(f"🔍 DEBUG: Image dimensions: {width}x{height}") | |
| frames.append(frame) | |
| if (i + 1) % 10 == 0: # 每10张图片输出一次进度 | |
| print(f"🔍 DEBUG: Downloaded {i + 1}/{len(image_files)} images") | |
| except Exception as e: | |
| print(f"❌ Error processing {oss_path}: {e}") | |
| continue | |
| print(f"🔍 DEBUG: Successfully loaded {len(frames)} frames") | |
| if not frames: | |
| raise gr.Error("No valid images could be processed for final video") | |
| # 4. 创建最终视频 | |
| final_video_dir = os.path.join(user_dir, task_id, "final") | |
| os.makedirs(final_video_dir, exist_ok=True) | |
| final_video_path = os.path.join(final_video_dir, "final_video.mp4") | |
| print(f"🔍 DEBUG: Creating video at: {final_video_path}") | |
| print(f"🔍 DEBUG: Video parameters: {len(frames)} frames, {fps} fps, {width}x{height}") | |
| # 使用OpenCV创建视频 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(final_video_path, fourcc, fps, (width, height)) | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| # 5. 转换为H.264格式 | |
| print(f"🔍 DEBUG: Converting to H.264...") | |
| h264_video_path = convert_to_h264(final_video_path) | |
| print(f"✅ Final video created: {h264_video_path} with {len(frames)} frames at {fps} fps") | |
| return h264_video_path | |
| except Exception as e: | |
| print(f"❌ Error creating final video from OSS images: {e}") | |
| print(f"❌ Exception type: {type(e).__name__}") | |
| import traceback | |
| print(f"❌ Traceback: {traceback.format_exc()}") | |
| raise gr.Error(f"Failed to create final video: {str(e)}") | |