vikramvasudevan's picture
Upload folder using huggingface_hub
96fa70b verified
import signal
import sys
import threading
from chromadb import Collection
import feedparser
from modules.youtube_metadata.db import get_youtube_metadata_collection, get_indexed_channels
from modules.youtube_metadata.embeddings import get_embedding
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def fetch_channel_videos_rss(channel_id, max_results=50):
feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
logger.info("fetch_channel_videos_rss: feed_url = %s", feed_url)
feed = feedparser.parse(feed_url)
# Capture channel title from the <feed> section
channel_title = getattr(feed.feed, "title", None)
channel_url = getattr(feed.feed, "link", None)
channel_author = getattr(feed.feed, "author", "")
logger.info("fetch_channel_videos_rss: channel_title = %s", channel_title)
videos = []
for entry in feed.entries[:max_results]:
description = (
getattr(getattr(entry, "title_detail", None), "value", "")
or getattr(entry, "media_description", None)
or getattr(entry, "summary", None)
or ""
)
videos.append(
{
"video_id": entry.yt_videoid,
"video_title": entry.title,
"description": description,
"published": entry.published,
"video_url": entry.link,
"channel_url": channel_url,
"channel_id": channel_id,
"channel_title": channel_title,
"channel_author" : channel_author,
}
)
return videos
def get_existing_video_ids(collection, channel_id):
# n_results: how many results to fetch; use a high number to get all entries
results = collection.get(where={"channel_id": channel_id})
existing_ids = set()
for metadata in results.get("metadatas", []):
if metadata and "video_id" in metadata:
existing_ids.add(metadata["video_id"])
return existing_ids
def filter_new_videos(videos, existing_ids):
return [v for v in videos if v["video_id"] not in existing_ids]
def add_to_chroma(collection: Collection, new_videos):
if not new_videos:
return
count = collection.count()
logger.info("new_videos = %s", new_videos)
documents = [f"{v['video_title']} - {v['description']}" for v in new_videos]
embeddings = [get_embedding(doc) for doc in documents]
collection.add(
documents=documents,
embeddings=embeddings,
metadatas=[
{**v, "_global_index": i}
for i, v in enumerate(new_videos, start=count)
],
ids=[v["video_id"] for v in new_videos],
)
def incremental_update(channel_id):
collection = get_youtube_metadata_collection()
existing_ids = get_existing_video_ids(collection, channel_id)
latest_videos = fetch_channel_videos_rss(channel_id)
new_videos = filter_new_videos(latest_videos, existing_ids)
if new_videos:
add_to_chroma(collection, new_videos)
logger.info(
f"youtube_poller: incremental_update: Added {len(new_videos)} new videos from {channel_id}"
)
else:
logger.info(f"youtube_poller: incremental_uddate: No new videos for {channel_id}")
stop_flag = False
def poll_loop():
import time
global stop_flag
configured_channels = get_indexed_channels().keys()
while not stop_flag:
for channel_id in configured_channels:
incremental_update(channel_id)
logger.info("youtube_poller: Sleeping for 10 minutes")
time.sleep(600) # 10 minutes
poll_thread = None
def start_poll():
global poll_thread
poll_thread = threading.Thread(target=poll_loop)
poll_thread.start()
def stop_poll():
global stop_flag
stop_flag = True
def youtube_poll_shutdown_handler(sig, frame):
global poll_thread
print("Shutting down...")
stop_poll()
poll_thread.join()
sys.exit(0)
signal.signal(signal.SIGINT, youtube_poll_shutdown_handler)
signal.signal(signal.SIGTERM, youtube_poll_shutdown_handler)