Spaces:
Runtime error
Runtime error
| # app.py | |
| import streamlit as st | |
| st.set_page_config(layout="wide") | |
| import av | |
| import cv2 | |
| import time | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration | |
| from llama_cpp import Llama | |
| from llama_cpp.llama_chat_format import LlamaChatCompletionHandlerRegistry, Llava15ChatHandler | |
| from termcolor import cprint | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # 1) Inline definition & registration of SmolVLM2ChatHandler | |
| class SmolVLM2ChatHandler(Llava15ChatHandler): | |
| CHAT_FORMAT = ( | |
| "<|im_start|>" | |
| "{% for message in messages %}" | |
| "{{ message['role'] | capitalize }}" | |
| "{% if message['role']=='user' and message['content'][0]['type']=='image_url' %}:" | |
| "{% else %}: " | |
| "{% endif %}" | |
| "{% for content in message['content'] %}" | |
| "{% if content['type']=='text' %}{{ content['text'] }}" | |
| "{% elif content['type']=='image_url' %}" | |
| "{% if content['image_url'] is string %}" | |
| "{{ content['image_url'] }}\n" | |
| "{% elif content['image_url'] is mapping %}" | |
| "{{ content['image_url']['url'] }}\n" | |
| "{% endif %}" | |
| "{% endif %}" | |
| "{% endfor %}" | |
| "<end_of_utterance>\n" | |
| "{% endfor %}" | |
| "{% if add_generation_prompt %}Assistant:{% endif %}" | |
| ) | |
| # Overwrite any previous registration | |
| LlamaChatCompletionHandlerRegistry().register_chat_completion_handler( | |
| "smolvlm2", SmolVLM2ChatHandler, overwrite=True | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # 2) Model & CLIP files β download if missing | |
| MODEL_FILE = "SmolVLM2-500M-Video-Instruct.Q8_0.gguf" | |
| CLIP_FILE = "mmproj-SmolVLM2-500M-Video-Instruct-Q8_0.gguf" | |
| MODEL_REPO = "mradermacher/SmolVLM2-500M-Video-Instruct-GGUF" | |
| CLIP_REPO = "ggml-org/SmolVLM2-500M-Video-Instruct-GGUF" | |
| def ensure_models(): | |
| if not os.path.exists(MODEL_FILE): | |
| path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) | |
| os.symlink(path, MODEL_FILE) | |
| if not os.path.exists(CLIP_FILE): | |
| path = hf_hub_download(repo_id=CLIP_REPO, filename=CLIP_FILE) | |
| os.symlink(path, CLIP_FILE) | |
| ensure_models() | |
| def load_llm(): | |
| handler = SmolVLM2ChatHandler(clip_model_path=CLIP_FILE, verbose=False) | |
| return Llama( | |
| model_path=MODEL_FILE, | |
| chat_handler=handler, | |
| n_ctx=8192, | |
| verbose=False, | |
| ) | |
| llm = load_llm() | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # 3) Helper to run a single frame through the model (with debug) | |
| def caption_frame(frame): | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: | |
| cv2.imwrite(f.name, frame) | |
| uri = Path(f.name).absolute().as_uri() | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "Focus only on describing the key dramatic action or notable event occurring " | |
| "in this image. Skip general context or scene-setting details unless they are " | |
| "crucial to understanding the main action." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": uri}}, | |
| {"type": "text", "text": "What is happening in this image?"}, | |
| ], | |
| }, | |
| ] | |
| print("DEBUG βΆ caption_frame: invoking LLM") | |
| resp = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=128, | |
| temperature=0.1, | |
| repeat_penalty=1.1, # discourage exact token repeats | |
| stop=["<end_of_utterance>"], | |
| ) | |
| out = (resp["choices"][0].get("message", {}).get("content") or "").strip() | |
| print(f"DEBUG βΆ LLM returned: {out!r}") | |
| return out | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # 4) Streamlit UI + WebRTC configuration | |
| st.title("π₯ Real-Time Camera Captioning with SmolVLM2 (CPU)") | |
| interval_ms = st.slider( | |
| "Caption every N ms", min_value=100, max_value=10000, value=3000, step=100 | |
| ) | |
| RTC_CONFIG = RTCConfiguration({ | |
| "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] | |
| }) | |
| import concurrent.futures | |
| class CaptionProcessor(VideoProcessorBase): | |
| def __init__(self): | |
| self.interval = 1.0 | |
| self.last_time = time.time() | |
| self.caption = "" | |
| self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
| self.future = None | |
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
| img = frame.to_ndarray(format="bgr24") | |
| now = time.time() | |
| # 1) Schedule a new inference if interval has passed and previous is done | |
| if now - self.last_time >= self.interval: | |
| self.last_time = now | |
| # only submit if there isn't already a running task | |
| if self.future is None or self.future.done(): | |
| # copy the frame so that downstream modifying code can't clash | |
| img_copy = img.copy() | |
| self.future = self.executor.submit(caption_frame, img_copy) | |
| # 2) If the background task finished, grab its result | |
| if self.future and self.future.done(): | |
| try: | |
| self.caption = self.future.result() | |
| except Exception as e: | |
| self.caption = f"[error: {e}]" | |
| self.future = None | |
| # 3) Draw the **last** caption onto every frame immediately | |
| cv2.putText( | |
| img, | |
| self.caption or "_β¦thinkingβ¦_", | |
| org=(10, img.shape[0] - 20), | |
| fontFace=cv2.FONT_HERSHEY_SIMPLEX, | |
| fontScale=0.6, | |
| color=(255, 255, 255), | |
| thickness=2, | |
| lineType=cv2.LINE_AA, | |
| ) | |
| return av.VideoFrame.from_ndarray(img, format="bgr24") | |
| ctx = webrtc_streamer( | |
| key="smolvlm2-captioner", | |
| video_processor_factory=CaptionProcessor, | |
| rtc_configuration=RTC_CONFIG, | |
| media_stream_constraints={"video": True, "audio": False}, | |
| ) | |
| # Update the processor interval | |
| if ctx.video_processor: | |
| ctx.video_processor.interval = interval_ms / 1000.0 | |
| # Placeholder for showing captions | |
| placeholder = st.empty() | |
| if ctx.state.playing: | |
| placeholder.markdown("**Caption:** _Waiting for first inferenceβ¦_") | |
| while ctx.state.playing: | |
| vp = ctx.video_processor | |
| if vp is not None: | |
| txt = vp.caption or "_β¦thinkingβ¦_" | |
| else: | |
| txt = "_β¦loadingβ¦_" | |
| placeholder.markdown(f"**Caption:** {txt}") | |
| time.sleep(0.1) | |
| else: | |
| st.info("βΆοΈ Click **Start** above to begin streaming") | |