import asyncio import json from fastapi import HTTPException import dropbox from dropbox.files import FolderMetadata, FileMetadata from datetime import datetime, timedelta, timezone from config import SanatanConfig from db import SanatanDatabase from modules.audio.model import AudioRequest, AudioType import logging from modules.dropbox.client import dbx from fastapi import HTTPException from enum import Enum import dropbox from dropbox.files import FileMetadata from dropbox.files import FileMetadata from datetime import datetime, timezone from fastapi import HTTPException from typing import List, Set from datetime import datetime, timezone, timedelta from fastapi import HTTPException import dropbox from dropbox.files import FileMetadata logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def list_dropbox_folder_hierarchy(dbx: dropbox.Dropbox, base_path: str = ""): """ Recursively fetches the folder/file hierarchy from Dropbox starting at base_path. Includes direct temporary download links for files. Args: dbx (dropbox.Dropbox): Authenticated Dropbox client. base_path (str): Path inside Dropbox ("" means root). Returns: dict: Nested dict with folders -> {subfolders/files with links}. """ hierarchy = {} try: print("listing files in", base_path) result = dbx.files_list_folder(base_path) while True: for entry in result.entries: if isinstance(entry, FolderMetadata): # Recurse into subfolder hierarchy[entry.name] = list_dropbox_folder_hierarchy( dbx, entry.path_lower ) elif isinstance(entry, FileMetadata): try: link = dbx.files_get_temporary_link(entry.path_lower).link hierarchy.setdefault("__files__", []).append( { "name": entry.name, "path": entry.path_lower, "download_url": link, } ) except Exception as link_err: print( f"Could not generate link for {entry.path_lower}: {link_err}" ) if result.has_more: result = dbx.files_list_folder_continue(result.cursor) else: break except Exception as e: print(f"Error listing folder {base_path}: {e}") return hierarchy # cache = {(scripture_name, global_index, type): {"url": ..., "expiry": ...}} audio_cache: dict[tuple[str, int, str], dict] = {} CACHE_TTL = timedelta(hours=3, minutes=30) # refresh before 4h expiry AUDIO_LIST_CACHE_TTL = timedelta(hours=24) audio_list_cache = {} # {(scripture_name): {"entries": [...], "expiry": datetime}} async def get_audio_urls(req: AudioRequest): base_path = f"/{req.scripture_name}/audio" prefix = f"{req.global_index}-" urls = {} now = datetime.now(timezone.utc) # --- 1️⃣ Check if folder listing is cached --- cache_entry = audio_list_cache.get(req.scripture_name) if cache_entry and cache_entry["expiry"] > now: entries = cache_entry["entries"] else: # Fetch fresh listing from Dropbox try: result = dbx.files_list_folder(base_path) entries = result.entries while result.has_more: result = dbx.files_list_folder_continue(result.cursor) entries.extend(result.entries) audio_list_cache[req.scripture_name] = { "entries": entries, "expiry": now + AUDIO_LIST_CACHE_TTL, } except dropbox.exceptions.ApiError: raise HTTPException(status_code=404, detail="Audio directory not found") # --- 2️⃣ Filter matching files --- matching_files = [ entry for entry in entries if isinstance(entry, FileMetadata) and entry.name.startswith(prefix) ] if not matching_files: raise HTTPException(status_code=404, detail="No audio files found") # --- 3️⃣ Generate or reuse cached URLs --- for entry in matching_files: filename = entry.name file_type = filename[len(prefix):].rsplit(".", 1)[0] cache_key = (req.scripture_name, req.global_index, file_type) cached = audio_cache.get(cache_key) if cached and cached["expiry"] > now: urls[file_type] = cached["url"] continue file_path = f"{base_path}/{filename}" try: temp_link = dbx.files_get_temporary_link(file_path).link urls[file_type] = temp_link audio_cache[cache_key] = {"url": temp_link, "expiry": now + CACHE_TTL} except dropbox.exceptions.ApiError: urls[file_type] = None return urls async def cleanup_audio_url_cache(interval_seconds: int = 600): """Periodically remove expired entries from audio_cache.""" while True: now = datetime.now(timezone.utc) expired_keys = [key for key, val in audio_cache.items() if val["expiry"] <= now] for key in expired_keys: del audio_cache[key] # Debug log if expired_keys: print(f"Cleaned up {len(expired_keys)} expired cache entries") await asyncio.sleep(interval_seconds) from datetime import datetime, timezone, timedelta # Simple in-memory cache _audio_indices_cache: dict[tuple[str, str], dict] = {} CACHE_TTL_2 = timedelta(minutes=10) async def get_global_indices_with_audio(scripture_name: str, audio_type: AudioType): """ Returns a sorted list of global indices for a given scripture that have audio of the specified type. Supports AudioType.any, AudioType.none, and specific types. Uses in-memory caching for repeated calls. """ now = datetime.now(timezone.utc) cache_key = (scripture_name, audio_type.value) # Check cache cached = _audio_indices_cache.get(cache_key) if cached and cached["expiry"] > now: return cached["indices"] # Step 1: list all files in Dropbox folder base_path = f"/{scripture_name}/audio" entries = [] try: result = dbx.files_list_folder(base_path) entries.extend(result.entries) while result.has_more: result = dbx.files_list_folder_continue(result.cursor) entries.extend(result.entries) except dropbox.exceptions.ApiError: raise HTTPException(status_code=404, detail="Audio directory not found") # Step 2: collect all global indices with any audio all_indices_with_audio = set() for entry in entries: if not isinstance(entry, FileMetadata) or "-" not in entry.name: continue global_index_str, _ = entry.name.split("-", 1) try: global_index = int(global_index_str) except ValueError: continue all_indices_with_audio.add(global_index) # Step 3: filter based on audio_type if audio_type == AudioType.none: db = SanatanDatabase() config = SanatanConfig() total_verses = db.count( collection_name=config.get_collection_name(scripture_name=scripture_name) ) indices = set(range(1, total_verses + 1)) - all_indices_with_audio elif audio_type == AudioType.any: indices = all_indices_with_audio else: indices = set() for entry in entries: if not isinstance(entry, FileMetadata) or "-" not in entry.name: continue global_index_str, rest = entry.name.split("-", 1) try: global_index = int(global_index_str) except ValueError: continue file_type = rest.rsplit(".", 1)[0].strip().lower() if file_type.startswith(audio_type.value): indices.add(global_index) # Cache the result _audio_indices_cache[cache_key] = { "indices": sorted(indices), "expiry": now + CACHE_TTL_2 } return sorted(indices) if __name__ == "__main__": # Create Dropbox client with your access token # data = list_dropbox_folder_hierarchy(dbx, "") # data = asyncio.run( # get_audio_urls(AudioRequest(scripture_name="divya_prabandham", global_index=0)) # ) data = asyncio.run( get_global_indices_with_audio( scripture_name="divya_prabandham", audio_type=AudioType.upanyasam ) ) # print(json.dumps(data, indent=2)) print(len(data))