Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import asyncio | |
| import json | |
| from fastapi import HTTPException | |
| import dropbox | |
| from dropbox.files import FolderMetadata, FileMetadata | |
| from datetime import datetime, timedelta, timezone | |
| from config import SanatanConfig | |
| from db import SanatanDatabase | |
| from modules.audio.model import AudioRequest, AudioType | |
| import logging | |
| from modules.dropbox.client import dbx | |
| from fastapi import HTTPException | |
| from enum import Enum | |
| import dropbox | |
| from dropbox.files import FileMetadata | |
| from dropbox.files import FileMetadata | |
| from datetime import datetime, timezone | |
| from fastapi import HTTPException | |
| from typing import List, Set | |
| from datetime import datetime, timezone, timedelta | |
| from fastapi import HTTPException | |
| import dropbox | |
| from dropbox.files import FileMetadata | |
| logging.basicConfig() | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| def list_dropbox_folder_hierarchy(dbx: dropbox.Dropbox, base_path: str = ""): | |
| """ | |
| Recursively fetches the folder/file hierarchy from Dropbox starting at base_path. | |
| Includes direct temporary download links for files. | |
| Args: | |
| dbx (dropbox.Dropbox): Authenticated Dropbox client. | |
| base_path (str): Path inside Dropbox ("" means root). | |
| Returns: | |
| dict: Nested dict with folders -> {subfolders/files with links}. | |
| """ | |
| hierarchy = {} | |
| try: | |
| print("listing files in", base_path) | |
| result = dbx.files_list_folder(base_path) | |
| while True: | |
| for entry in result.entries: | |
| if isinstance(entry, FolderMetadata): | |
| # Recurse into subfolder | |
| hierarchy[entry.name] = list_dropbox_folder_hierarchy( | |
| dbx, entry.path_lower | |
| ) | |
| elif isinstance(entry, FileMetadata): | |
| try: | |
| link = dbx.files_get_temporary_link(entry.path_lower).link | |
| hierarchy.setdefault("__files__", []).append( | |
| { | |
| "name": entry.name, | |
| "path": entry.path_lower, | |
| "download_url": link, | |
| } | |
| ) | |
| except Exception as link_err: | |
| print( | |
| f"Could not generate link for {entry.path_lower}: {link_err}" | |
| ) | |
| if result.has_more: | |
| result = dbx.files_list_folder_continue(result.cursor) | |
| else: | |
| break | |
| except Exception as e: | |
| print(f"Error listing folder {base_path}: {e}") | |
| return hierarchy | |
| # cache = {(scripture_name, global_index, type): {"url": ..., "expiry": ...}} | |
| audio_cache: dict[tuple[str, int, str], dict] = {} | |
| CACHE_TTL = timedelta(hours=3, minutes=30) # refresh before 4h expiry | |
| AUDIO_LIST_CACHE_TTL = timedelta(hours=24) | |
| audio_list_cache = {} # {(scripture_name): {"entries": [...], "expiry": datetime}} | |
| async def get_audio_urls(req: AudioRequest): | |
| base_path = f"/{req.scripture_name}/audio" | |
| prefix = f"{req.global_index}-" | |
| urls = {} | |
| now = datetime.now(timezone.utc) | |
| # --- 1️⃣ Check if folder listing is cached --- | |
| cache_entry = audio_list_cache.get(req.scripture_name) | |
| if cache_entry and cache_entry["expiry"] > now: | |
| entries = cache_entry["entries"] | |
| else: | |
| # Fetch fresh listing from Dropbox | |
| try: | |
| result = dbx.files_list_folder(base_path) | |
| entries = result.entries | |
| while result.has_more: | |
| result = dbx.files_list_folder_continue(result.cursor) | |
| entries.extend(result.entries) | |
| audio_list_cache[req.scripture_name] = { | |
| "entries": entries, | |
| "expiry": now + AUDIO_LIST_CACHE_TTL, | |
| } | |
| except dropbox.exceptions.ApiError: | |
| raise HTTPException(status_code=404, detail="Audio directory not found") | |
| # --- 2️⃣ Filter matching files --- | |
| matching_files = [ | |
| entry | |
| for entry in entries | |
| if isinstance(entry, FileMetadata) and entry.name.startswith(prefix) | |
| ] | |
| if not matching_files: | |
| raise HTTPException(status_code=404, detail="No audio files found") | |
| # --- 3️⃣ Generate or reuse cached URLs --- | |
| for entry in matching_files: | |
| filename = entry.name | |
| file_type = filename[len(prefix):].rsplit(".", 1)[0] | |
| cache_key = (req.scripture_name, req.global_index, file_type) | |
| cached = audio_cache.get(cache_key) | |
| if cached and cached["expiry"] > now: | |
| urls[file_type] = cached["url"] | |
| continue | |
| file_path = f"{base_path}/{filename}" | |
| try: | |
| temp_link = dbx.files_get_temporary_link(file_path).link | |
| urls[file_type] = temp_link | |
| audio_cache[cache_key] = {"url": temp_link, "expiry": now + CACHE_TTL} | |
| except dropbox.exceptions.ApiError: | |
| urls[file_type] = None | |
| return urls | |
| async def cleanup_audio_url_cache(interval_seconds: int = 600): | |
| """Periodically remove expired entries from audio_cache.""" | |
| while True: | |
| now = datetime.now(timezone.utc) | |
| expired_keys = [key for key, val in audio_cache.items() if val["expiry"] <= now] | |
| for key in expired_keys: | |
| del audio_cache[key] | |
| # Debug log | |
| if expired_keys: | |
| print(f"Cleaned up {len(expired_keys)} expired cache entries") | |
| await asyncio.sleep(interval_seconds) | |
| from datetime import datetime, timezone, timedelta | |
| # Simple in-memory cache | |
| _audio_indices_cache: dict[tuple[str, str], dict] = {} | |
| CACHE_TTL_2 = timedelta(minutes=10) | |
| async def get_global_indices_with_audio(scripture_name: str, audio_type: AudioType): | |
| """ | |
| Returns a sorted list of global indices for a given scripture that have audio of the specified type. | |
| Supports AudioType.any, AudioType.none, and specific types. | |
| Uses in-memory caching for repeated calls. | |
| """ | |
| now = datetime.now(timezone.utc) | |
| cache_key = (scripture_name, audio_type.value) | |
| # Check cache | |
| cached = _audio_indices_cache.get(cache_key) | |
| if cached and cached["expiry"] > now: | |
| return cached["indices"] | |
| # Step 1: list all files in Dropbox folder | |
| base_path = f"/{scripture_name}/audio" | |
| entries = [] | |
| try: | |
| result = dbx.files_list_folder(base_path) | |
| entries.extend(result.entries) | |
| while result.has_more: | |
| result = dbx.files_list_folder_continue(result.cursor) | |
| entries.extend(result.entries) | |
| except dropbox.exceptions.ApiError: | |
| raise HTTPException(status_code=404, detail="Audio directory not found") | |
| # Step 2: collect all global indices with any audio | |
| all_indices_with_audio = set() | |
| for entry in entries: | |
| if not isinstance(entry, FileMetadata) or "-" not in entry.name: | |
| continue | |
| global_index_str, _ = entry.name.split("-", 1) | |
| try: | |
| global_index = int(global_index_str) | |
| except ValueError: | |
| continue | |
| all_indices_with_audio.add(global_index) | |
| # Step 3: filter based on audio_type | |
| if audio_type == AudioType.none: | |
| db = SanatanDatabase() | |
| config = SanatanConfig() | |
| total_verses = db.count( | |
| collection_name=config.get_collection_name(scripture_name=scripture_name) | |
| ) | |
| indices = set(range(1, total_verses + 1)) - all_indices_with_audio | |
| elif audio_type == AudioType.any: | |
| indices = all_indices_with_audio | |
| else: | |
| indices = set() | |
| for entry in entries: | |
| if not isinstance(entry, FileMetadata) or "-" not in entry.name: | |
| continue | |
| global_index_str, rest = entry.name.split("-", 1) | |
| try: | |
| global_index = int(global_index_str) | |
| except ValueError: | |
| continue | |
| file_type = rest.rsplit(".", 1)[0].strip().lower() | |
| if file_type.startswith(audio_type.value): | |
| indices.add(global_index) | |
| # Cache the result | |
| _audio_indices_cache[cache_key] = { | |
| "indices": sorted(indices), | |
| "expiry": now + CACHE_TTL_2 | |
| } | |
| return sorted(indices) | |
| if __name__ == "__main__": | |
| # Create Dropbox client with your access token | |
| # data = list_dropbox_folder_hierarchy(dbx, "") | |
| # data = asyncio.run( | |
| # get_audio_urls(AudioRequest(scripture_name="divya_prabandham", global_index=0)) | |
| # ) | |
| data = asyncio.run( | |
| get_global_indices_with_audio( | |
| scripture_name="divya_prabandham", audio_type=AudioType.upanyasam | |
| ) | |
| ) | |
| # print(json.dumps(data, indent=2)) | |
| print(len(data)) | |