Spaces:
Running
on
Zero
Running
on
Zero
added tests for processing video
Browse files- src/app.py +28 -7
- tests/test_video.py +82 -7
src/app.py
CHANGED
|
@@ -1,8 +1,13 @@
|
|
| 1 |
import torch
|
| 2 |
from huggingface_hub import login
|
| 3 |
from collections.abc import Iterator
|
| 4 |
-
from transformers import
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
import spaces
|
|
|
|
| 6 |
from threading import Thread
|
| 7 |
import gradio as gr
|
| 8 |
import os
|
|
@@ -26,21 +31,21 @@ model = Gemma3ForConditionalGeneration.from_pretrained(
|
|
| 26 |
attn_implementation="eager",
|
| 27 |
)
|
| 28 |
|
|
|
|
| 29 |
def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, float]]:
|
| 30 |
frames: list[tuple[Image.Image, float]] = []
|
| 31 |
capture = cv2.VideoCapture(video_path)
|
| 32 |
if not capture.isOpened():
|
| 33 |
raise ValueError(f"Could not open video file: {video_path}")
|
| 34 |
-
|
| 35 |
fps = capture.get(cv2.CAP_PROP_FPS)
|
| 36 |
total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 37 |
|
| 38 |
frame_interval = max(total_frames // max_images, 1)
|
|
|
|
|
|
|
| 39 |
|
| 40 |
-
|
| 41 |
-
if len(frames) >= max_images:
|
| 42 |
-
break
|
| 43 |
-
|
| 44 |
capture.set(cv2.CAP_PROP_POS_FRAMES, i)
|
| 45 |
success, image = capture.read()
|
| 46 |
if success:
|
|
@@ -49,5 +54,21 @@ def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, floa
|
|
| 49 |
timestamp = round(i / fps, 2)
|
| 50 |
frames.append((pil_image, timestamp))
|
| 51 |
|
|
|
|
|
|
|
| 52 |
capture.release()
|
| 53 |
-
return frames
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import torch
|
| 2 |
from huggingface_hub import login
|
| 3 |
from collections.abc import Iterator
|
| 4 |
+
from transformers import (
|
| 5 |
+
Gemma3ForConditionalGeneration,
|
| 6 |
+
TextIteratorStreamer,
|
| 7 |
+
Gemma3Processor,
|
| 8 |
+
)
|
| 9 |
import spaces
|
| 10 |
+
import tempfile
|
| 11 |
from threading import Thread
|
| 12 |
import gradio as gr
|
| 13 |
import os
|
|
|
|
| 31 |
attn_implementation="eager",
|
| 32 |
)
|
| 33 |
|
| 34 |
+
|
| 35 |
def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, float]]:
|
| 36 |
frames: list[tuple[Image.Image, float]] = []
|
| 37 |
capture = cv2.VideoCapture(video_path)
|
| 38 |
if not capture.isOpened():
|
| 39 |
raise ValueError(f"Could not open video file: {video_path}")
|
| 40 |
+
|
| 41 |
fps = capture.get(cv2.CAP_PROP_FPS)
|
| 42 |
total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 43 |
|
| 44 |
frame_interval = max(total_frames // max_images, 1)
|
| 45 |
+
max_position = min(total_frames, max_images * frame_interval)
|
| 46 |
+
i = 0
|
| 47 |
|
| 48 |
+
while i < max_position and len(frames) < max_images:
|
|
|
|
|
|
|
|
|
|
| 49 |
capture.set(cv2.CAP_PROP_POS_FRAMES, i)
|
| 50 |
success, image = capture.read()
|
| 51 |
if success:
|
|
|
|
| 54 |
timestamp = round(i / fps, 2)
|
| 55 |
frames.append((pil_image, timestamp))
|
| 56 |
|
| 57 |
+
i += frame_interval
|
| 58 |
+
|
| 59 |
capture.release()
|
| 60 |
+
return frames
|
| 61 |
+
|
| 62 |
+
def process_video(video_path: str, max_images: int) -> list[dict]:
|
| 63 |
+
result_content = []
|
| 64 |
+
# TODO: Change max_image to slider
|
| 65 |
+
frames = get_frames(video_path, max_images)
|
| 66 |
+
# Take frame and attach to result_content with timestamp
|
| 67 |
+
for frame in frames:
|
| 68 |
+
image, timestamp = frame
|
| 69 |
+
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
|
| 70 |
+
image.save(temp_file.name)
|
| 71 |
+
result_content.append({"type": "text", "text": f"Frame {timestamp}:"})
|
| 72 |
+
result_content.append({"type": "image", "url": temp_file.name})
|
| 73 |
+
logger.debug(f"Processed {len(frames)} frames from video {video_path} with frames {result_content}")
|
| 74 |
+
return result_content
|
tests/test_video.py
CHANGED
|
@@ -3,25 +3,100 @@ import os
|
|
| 3 |
import cv2
|
| 4 |
from PIL import Image
|
| 5 |
from pathlib import Path
|
|
|
|
| 6 |
|
| 7 |
-
from src.app import get_frames
|
| 8 |
|
| 9 |
# Get the project root directory
|
| 10 |
ROOT_DIR = Path(__file__).parent.parent
|
| 11 |
|
|
|
|
| 12 |
def test_correct_frame_return():
|
| 13 |
"""Test that get_frames returns a list of (Image, float) tuples."""
|
| 14 |
# Path to a test video file
|
| 15 |
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
|
| 16 |
-
|
| 17 |
# Ensure the test video exists
|
| 18 |
-
assert os.path.exists(video_path)
|
| 19 |
-
|
| 20 |
# Test with a small number of frames
|
| 21 |
max_images = 3
|
| 22 |
frames = get_frames(video_path, max_images)
|
| 23 |
-
|
| 24 |
-
# Check return type
|
| 25 |
assert isinstance(frames, list)
|
| 26 |
assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
|
| 27 |
-
assert all(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
import cv2
|
| 4 |
from PIL import Image
|
| 5 |
from pathlib import Path
|
| 6 |
+
import tempfile
|
| 7 |
|
| 8 |
+
from src.app import get_frames, process_video
|
| 9 |
|
| 10 |
# Get the project root directory
|
| 11 |
ROOT_DIR = Path(__file__).parent.parent
|
| 12 |
|
| 13 |
+
|
| 14 |
def test_correct_frame_return():
|
| 15 |
"""Test that get_frames returns a list of (Image, float) tuples."""
|
| 16 |
# Path to a test video file
|
| 17 |
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
|
| 18 |
+
|
| 19 |
# Ensure the test video exists
|
| 20 |
+
assert os.path.exists(video_path)
|
| 21 |
+
|
| 22 |
# Test with a small number of frames
|
| 23 |
max_images = 3
|
| 24 |
frames = get_frames(video_path, max_images)
|
| 25 |
+
|
|
|
|
| 26 |
assert isinstance(frames, list)
|
| 27 |
assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
|
| 28 |
+
assert all(
|
| 29 |
+
isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames
|
| 30 |
+
)
|
| 31 |
+
|
| 32 |
+
|
| 33 |
+
def test_process_video_structure():
|
| 34 |
+
"""Test that process_video returns the expected list structure."""
|
| 35 |
+
|
| 36 |
+
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
|
| 37 |
+
max_images = 2
|
| 38 |
+
|
| 39 |
+
result = process_video(video_path, max_images)
|
| 40 |
+
|
| 41 |
+
# Should have 2 items (text + image) per frame
|
| 42 |
+
assert len(result) == max_images * 2
|
| 43 |
+
|
| 44 |
+
# Check structure of items
|
| 45 |
+
for i in range(0, len(result), 2):
|
| 46 |
+
# Text item
|
| 47 |
+
assert result[i]["type"] == "text"
|
| 48 |
+
assert result[i]["text"].startswith("Frame ")
|
| 49 |
+
|
| 50 |
+
# Image item
|
| 51 |
+
assert result[i + 1]["type"] == "image"
|
| 52 |
+
assert "url" in result[i + 1]
|
| 53 |
+
assert os.path.exists(result[i + 1]["url"])
|
| 54 |
+
|
| 55 |
+
# Verify the image file is valid
|
| 56 |
+
try:
|
| 57 |
+
img = Image.open(result[i + 1]["url"])
|
| 58 |
+
img.verify() # Make sure it's a valid image
|
| 59 |
+
except Exception as e:
|
| 60 |
+
pytest.fail(f"Invalid image file: {e}")
|
| 61 |
+
|
| 62 |
+
|
| 63 |
+
def test_process_video_timestamps():
|
| 64 |
+
"""Test that timestamps in the result are properly formatted."""
|
| 65 |
+
|
| 66 |
+
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
|
| 67 |
+
max_images = 3
|
| 68 |
+
|
| 69 |
+
result = process_video(video_path, max_images)
|
| 70 |
+
|
| 71 |
+
# Extract timestamps from text items
|
| 72 |
+
timestamps = []
|
| 73 |
+
for i in range(0, len(result), 2):
|
| 74 |
+
if result[i]["type"] == "text":
|
| 75 |
+
# Extract timestamp from "Frame X.XX:" format
|
| 76 |
+
timestamp_text = result[i]["text"].split()[1].rstrip(":")
|
| 77 |
+
timestamps.append(float(timestamp_text))
|
| 78 |
+
|
| 79 |
+
# Check timestamps are ascending
|
| 80 |
+
assert len(timestamps) == max_images
|
| 81 |
+
assert all(timestamps[i] <= timestamps[i + 1] for i in range(len(timestamps) - 1))
|
| 82 |
+
|
| 83 |
+
|
| 84 |
+
def test_process_video_temp_files():
|
| 85 |
+
"""Test that temporary files are created and cleaned up properly."""
|
| 86 |
+
|
| 87 |
+
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
|
| 88 |
+
max_images = 1
|
| 89 |
+
|
| 90 |
+
result = process_video(video_path, max_images)
|
| 91 |
+
|
| 92 |
+
# Verify temp file exists
|
| 93 |
+
image_path = result[1]["url"]
|
| 94 |
+
assert os.path.exists(image_path)
|
| 95 |
+
assert image_path.endswith(".png")
|
| 96 |
+
|
| 97 |
+
|
| 98 |
+
def test_process_video_invalid_path():
|
| 99 |
+
"""Test that process_video handles invalid paths appropriately."""
|
| 100 |
+
|
| 101 |
+
with pytest.raises(ValueError):
|
| 102 |
+
process_video("nonexistent_video.mp4", 3)
|