Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import threading | |
| import gradio as gr | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from modules.youtube_metadata.collector import fetch_all_channel_videos | |
| from modules.youtube_metadata.indexer import index_videos | |
| # global stop signal | |
| stop_event = threading.Event() | |
| MAX_BATCHES = 200 # safety cutoff | |
| def stop_sync(): | |
| """External call to stop the sync process.""" | |
| stop_event.set() | |
| def sync_channels_from_youtube(api_key, channel_urls: list, progress: gr.Progress = None): | |
| """ | |
| Sync multiple channels, yielding (progress_message, videos_indexed_in_batch) | |
| """ | |
| global stop_event | |
| stop_event.clear() | |
| total_channels = len(channel_urls) | |
| total_videos = 0 | |
| for idx, channel_url in enumerate(channel_urls, 1): | |
| if stop_event.is_set(): | |
| yield f"π Stopped before processing channel: {channel_url}", 0 | |
| break | |
| yield f"π Syncing {channel_url} ({idx}/{total_channels})", 0 | |
| # stream video-level progress from inner generator | |
| for update_message, batch_count in _refresh_single_channel(api_key, channel_url, progress): | |
| total_videos += batch_count | |
| yield update_message, batch_count | |
| yield f"β Finished syncing. Total channels: {total_channels}, total videos: {total_videos}", 0 | |
| def _refresh_single_channel(api_key, channel_url, progress): | |
| # fetch all batches first | |
| fetched_batches = list(fetch_all_channel_videos(api_key, channel_url)) | |
| all_videos = [v | {"channel_url": channel_url} for _, batch in fetched_batches for v in batch] | |
| total_videos = len(all_videos) | |
| if total_videos == 0: | |
| yield f"{channel_url}: No videos found", 0 | |
| return | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = [ | |
| executor.submit(index_videos, batch, channel_url=channel_url) | |
| for _, batch in fetched_batches | |
| ] | |
| completed_videos = 0 | |
| for f in as_completed(futures): | |
| if stop_event.is_set(): | |
| yield "π Stop requested during indexing stage", completed_videos | |
| break | |
| try: | |
| indexed_count = f.result() | |
| if indexed_count is None: | |
| indexed_count = len(all_videos) # fallback if index_videos doesn't return | |
| except Exception as e: | |
| indexed_count = 0 | |
| yield f"β οΈ Error indexing {channel_url}: {e}", completed_videos | |
| completed_videos += indexed_count | |
| pct = 100.0 * completed_videos / max(1, total_videos) | |
| if progress: | |
| progress(completed_videos / total_videos) | |
| yield f"{channel_url}: Indexed {completed_videos}/{total_videos} videos β {pct:.1f}%", completed_videos | |