Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import asyncio | |
| import json | |
| from fastapi import HTTPException | |
| from pydantic import BaseModel | |
| import os | |
| import dropbox | |
| from dropbox.files import FolderMetadata, FileMetadata | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta, timezone | |
| load_dotenv() | |
| REFRESH_TOKEN = os.getenv("DROPBOX_REFRESH_TOKEN") | |
| APP_KEY = os.getenv("DROPBOX_APP_KEY") | |
| APP_SECRET = os.getenv("DROPBOX_APP_SECRET") | |
| if not REFRESH_TOKEN: | |
| raise Exception("DROPBOX_REFRESH_TOKEN missing") | |
| if not APP_KEY: | |
| raise Exception("APP_KEY missing") | |
| if not APP_SECRET: | |
| raise Exception("APP_SECRET missing") | |
| dbx = dropbox.Dropbox( | |
| app_key=APP_KEY, app_secret=APP_SECRET, oauth2_refresh_token=REFRESH_TOKEN | |
| ) | |
| def list_dropbox_folder_hierarchy(dbx: dropbox.Dropbox, base_path: str = ""): | |
| """ | |
| Recursively fetches the folder/file hierarchy from Dropbox starting at base_path. | |
| Includes direct temporary download links for files. | |
| Args: | |
| dbx (dropbox.Dropbox): Authenticated Dropbox client. | |
| base_path (str): Path inside Dropbox ("" means root). | |
| Returns: | |
| dict: Nested dict with folders -> {subfolders/files with links}. | |
| """ | |
| hierarchy = {} | |
| try: | |
| print("listing files in", base_path) | |
| result = dbx.files_list_folder(base_path) | |
| while True: | |
| for entry in result.entries: | |
| if isinstance(entry, FolderMetadata): | |
| # Recurse into subfolder | |
| hierarchy[entry.name] = list_dropbox_folder_hierarchy( | |
| dbx, entry.path_lower | |
| ) | |
| elif isinstance(entry, FileMetadata): | |
| try: | |
| link = dbx.files_get_temporary_link(entry.path_lower).link | |
| hierarchy.setdefault("__files__", []).append( | |
| { | |
| "name": entry.name, | |
| "path": entry.path_lower, | |
| "download_url": link, | |
| } | |
| ) | |
| except Exception as link_err: | |
| print( | |
| f"Could not generate link for {entry.path_lower}: {link_err}" | |
| ) | |
| if result.has_more: | |
| result = dbx.files_list_folder_continue(result.cursor) | |
| else: | |
| break | |
| except Exception as e: | |
| print(f"Error listing folder {base_path}: {e}") | |
| return hierarchy | |
| class AudioRequest(BaseModel): | |
| scripture_name: str | |
| global_index: int | |
| # cache = {(scripture_name, global_index, type): {"url": ..., "expiry": ...}} | |
| audio_cache: dict[tuple[str, int, str], dict] = {} | |
| CACHE_TTL = timedelta(hours=3, minutes=30) # refresh before 4h expiry | |
| async def get_audio_urls(req: AudioRequest): | |
| base_path = f"/{req.scripture_name}/audio" | |
| files_to_check = { | |
| "recitation": f"{req.global_index}-recitation.mp3", | |
| "santhai": f"{req.global_index}-santhai.mp3", | |
| } | |
| urls = {} | |
| now = datetime.now(timezone.utc) # timezone-aware UTC datetime | |
| for key, filename in files_to_check.items(): | |
| cache_key = (req.scripture_name, req.global_index, key) | |
| # Check cache first | |
| cached = audio_cache.get(cache_key) | |
| if cached and cached["expiry"] > now: | |
| urls[key] = cached["url"] | |
| continue | |
| # Generate new temporary link | |
| file_path = f"{base_path}/{filename}" | |
| try: | |
| metadata = dbx.files_get_metadata(file_path) | |
| if isinstance(metadata, FileMetadata): | |
| temp_link = dbx.files_get_temporary_link(file_path).link | |
| urls[key] = temp_link | |
| # store in cache with expiry | |
| audio_cache[cache_key] = {"url": temp_link, "expiry": now + CACHE_TTL} | |
| except dropbox.exceptions.ApiError: | |
| urls[key] = None | |
| if not any(urls.values()): | |
| raise HTTPException(status_code=404, detail="No audio files found") | |
| return urls | |
| async def cleanup_audio_url_cache(interval_seconds: int = 600): | |
| """Periodically remove expired entries from audio_cache.""" | |
| while True: | |
| now = datetime.now(timezone.utc) | |
| expired_keys = [key for key, val in audio_cache.items() if val["expiry"] <= now] | |
| for key in expired_keys: | |
| del audio_cache[key] | |
| # Debug log | |
| if expired_keys: | |
| print(f"Cleaned up {len(expired_keys)} expired cache entries") | |
| await asyncio.sleep(interval_seconds) | |
| if __name__ == "__main__": | |
| # Create Dropbox client with your access token | |
| # data = list_dropbox_folder_hierarchy(dbx, "") | |
| data = asyncio.run( | |
| get_audio_urls(AudioRequest(scripture_name="divya_prabandham", global_index=0)) | |
| ) | |
| print(json.dumps(data, indent=2)) | |