vikramvasudevan's picture
Upload folder using huggingface_hub
73a6587 verified
import threading
import gradio as gr
from concurrent.futures import ThreadPoolExecutor, as_completed
from modules.youtube_metadata.collector import fetch_all_channel_videos
from modules.youtube_metadata.indexer import index_videos
# global stop signal
stop_event = threading.Event()
MAX_BATCHES = 200 # safety cutoff
def stop_sync():
"""External call to stop the sync process."""
stop_event.set()
def sync_channels_from_youtube(api_key, channel_urls: list, progress: gr.Progress = None):
"""
Sync multiple channels, yielding (progress_message, videos_indexed_in_batch)
"""
global stop_event
stop_event.clear()
total_channels = len(channel_urls)
total_videos = 0
for idx, channel_url in enumerate(channel_urls, 1):
if stop_event.is_set():
yield f"πŸ›‘ Stopped before processing channel: {channel_url}", 0
break
yield f"πŸ”„ Syncing {channel_url} ({idx}/{total_channels})", 0
# stream video-level progress from inner generator
for update_message, batch_count in _refresh_single_channel(api_key, channel_url, progress):
total_videos += batch_count
yield update_message, batch_count
yield f"βœ… Finished syncing. Total channels: {total_channels}, total videos: {total_videos}", 0
def _refresh_single_channel(api_key, channel_url, progress):
# fetch all batches first
fetched_batches = list(fetch_all_channel_videos(api_key, channel_url))
all_videos = [v | {"channel_url": channel_url} for _, batch in fetched_batches for v in batch]
total_videos = len(all_videos)
if total_videos == 0:
yield f"{channel_url}: No videos found", 0
return
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(index_videos, batch, channel_url=channel_url)
for _, batch in fetched_batches
]
completed_videos = 0
for f in as_completed(futures):
if stop_event.is_set():
yield "πŸ›‘ Stop requested during indexing stage", completed_videos
break
try:
indexed_count = f.result()
if indexed_count is None:
indexed_count = len(all_videos) # fallback if index_videos doesn't return
except Exception as e:
indexed_count = 0
yield f"⚠️ Error indexing {channel_url}: {e}", completed_videos
completed_videos += indexed_count
pct = 100.0 * completed_videos / max(1, total_videos)
if progress:
progress(completed_videos / total_videos)
yield f"{channel_url}: Indexed {completed_videos}/{total_videos} videos β€” {pct:.1f}%", completed_videos