Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import signal | |
| import sys | |
| import threading | |
| from chromadb import Collection | |
| import feedparser | |
| from modules.youtube_metadata.db import get_youtube_metadata_collection, get_indexed_channels | |
| from modules.youtube_metadata.embeddings import get_embedding | |
| import logging | |
| logging.basicConfig() | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| def fetch_channel_videos_rss(channel_id, max_results=50): | |
| feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}" | |
| logger.info("fetch_channel_videos_rss: feed_url = %s", feed_url) | |
| feed = feedparser.parse(feed_url) | |
| # Capture channel title from the <feed> section | |
| channel_title = getattr(feed.feed, "title", None) | |
| channel_url = getattr(feed.feed, "link", None) | |
| channel_author = getattr(feed.feed, "author", "") | |
| logger.info("fetch_channel_videos_rss: channel_title = %s", channel_title) | |
| videos = [] | |
| for entry in feed.entries[:max_results]: | |
| description = ( | |
| getattr(getattr(entry, "title_detail", None), "value", "") | |
| or getattr(entry, "media_description", None) | |
| or getattr(entry, "summary", None) | |
| or "" | |
| ) | |
| videos.append( | |
| { | |
| "video_id": entry.yt_videoid, | |
| "video_title": entry.title, | |
| "description": description, | |
| "published": entry.published, | |
| "video_url": entry.link, | |
| "channel_url": channel_url, | |
| "channel_id": channel_id, | |
| "channel_title": channel_title, | |
| "channel_author" : channel_author, | |
| } | |
| ) | |
| return videos | |
| def get_existing_video_ids(collection, channel_id): | |
| # n_results: how many results to fetch; use a high number to get all entries | |
| results = collection.get(where={"channel_id": channel_id}) | |
| existing_ids = set() | |
| for metadata in results.get("metadatas", []): | |
| if metadata and "video_id" in metadata: | |
| existing_ids.add(metadata["video_id"]) | |
| return existing_ids | |
| def filter_new_videos(videos, existing_ids): | |
| return [v for v in videos if v["video_id"] not in existing_ids] | |
| def add_to_chroma(collection: Collection, new_videos): | |
| if not new_videos: | |
| return | |
| count = collection.count() | |
| logger.info("new_videos = %s", new_videos) | |
| documents = [f"{v['video_title']} - {v['description']}" for v in new_videos] | |
| embeddings = [get_embedding(doc) for doc in documents] | |
| collection.add( | |
| documents=documents, | |
| embeddings=embeddings, | |
| metadatas=[ | |
| {**v, "_global_index": i} | |
| for i, v in enumerate(new_videos, start=count) | |
| ], | |
| ids=[v["video_id"] for v in new_videos], | |
| ) | |
| def incremental_update(channel_id): | |
| collection = get_youtube_metadata_collection() | |
| existing_ids = get_existing_video_ids(collection, channel_id) | |
| latest_videos = fetch_channel_videos_rss(channel_id) | |
| new_videos = filter_new_videos(latest_videos, existing_ids) | |
| if new_videos: | |
| add_to_chroma(collection, new_videos) | |
| logger.info( | |
| f"youtube_poller: incremental_update: Added {len(new_videos)} new videos from {channel_id}" | |
| ) | |
| else: | |
| logger.info(f"youtube_poller: incremental_uddate: No new videos for {channel_id}") | |
| stop_flag = False | |
| def poll_loop(): | |
| import time | |
| global stop_flag | |
| configured_channels = get_indexed_channels().keys() | |
| while not stop_flag: | |
| for channel_id in configured_channels: | |
| incremental_update(channel_id) | |
| logger.info("youtube_poller: Sleeping for 10 minutes") | |
| time.sleep(600) # 10 minutes | |
| poll_thread = None | |
| def start_poll(): | |
| global poll_thread | |
| poll_thread = threading.Thread(target=poll_loop) | |
| poll_thread.start() | |
| def stop_poll(): | |
| global stop_flag | |
| stop_flag = True | |
| def youtube_poll_shutdown_handler(sig, frame): | |
| global poll_thread | |
| print("Shutting down...") | |
| stop_poll() | |
| poll_thread.join() | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, youtube_poll_shutdown_handler) | |
| signal.signal(signal.SIGTERM, youtube_poll_shutdown_handler) |