vikramvasudevan's picture
Upload folder using huggingface_hub
7f4024a verified
import asyncio
import json
from fastapi import HTTPException
import dropbox
from dropbox.files import FolderMetadata, FileMetadata
from datetime import datetime, timedelta, timezone
from config import SanatanConfig
from db import SanatanDatabase
from modules.audio.model import AudioRequest, AudioType
import logging
from modules.dropbox.client import dbx
from fastapi import HTTPException
from enum import Enum
import dropbox
from dropbox.files import FileMetadata
from dropbox.files import FileMetadata
from datetime import datetime, timezone
from fastapi import HTTPException
from typing import List, Set
from datetime import datetime, timezone, timedelta
from fastapi import HTTPException
import dropbox
from dropbox.files import FileMetadata
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def list_dropbox_folder_hierarchy(dbx: dropbox.Dropbox, base_path: str = ""):
"""
Recursively fetches the folder/file hierarchy from Dropbox starting at base_path.
Includes direct temporary download links for files.
Args:
dbx (dropbox.Dropbox): Authenticated Dropbox client.
base_path (str): Path inside Dropbox ("" means root).
Returns:
dict: Nested dict with folders -> {subfolders/files with links}.
"""
hierarchy = {}
try:
print("listing files in", base_path)
result = dbx.files_list_folder(base_path)
while True:
for entry in result.entries:
if isinstance(entry, FolderMetadata):
# Recurse into subfolder
hierarchy[entry.name] = list_dropbox_folder_hierarchy(
dbx, entry.path_lower
)
elif isinstance(entry, FileMetadata):
try:
link = dbx.files_get_temporary_link(entry.path_lower).link
hierarchy.setdefault("__files__", []).append(
{
"name": entry.name,
"path": entry.path_lower,
"download_url": link,
}
)
except Exception as link_err:
print(
f"Could not generate link for {entry.path_lower}: {link_err}"
)
if result.has_more:
result = dbx.files_list_folder_continue(result.cursor)
else:
break
except Exception as e:
print(f"Error listing folder {base_path}: {e}")
return hierarchy
# cache = {(scripture_name, global_index, type): {"url": ..., "expiry": ...}}
audio_cache: dict[tuple[str, int, str], dict] = {}
CACHE_TTL = timedelta(hours=3, minutes=30) # refresh before 4h expiry
AUDIO_LIST_CACHE_TTL = timedelta(hours=24)
audio_list_cache = {} # {(scripture_name): {"entries": [...], "expiry": datetime}}
async def get_audio_urls(req: AudioRequest):
base_path = f"/{req.scripture_name}/audio"
prefix = f"{req.global_index}-"
urls = {}
now = datetime.now(timezone.utc)
# --- 1️⃣ Check if folder listing is cached ---
cache_entry = audio_list_cache.get(req.scripture_name)
if cache_entry and cache_entry["expiry"] > now:
entries = cache_entry["entries"]
else:
# Fetch fresh listing from Dropbox
try:
result = dbx.files_list_folder(base_path)
entries = result.entries
while result.has_more:
result = dbx.files_list_folder_continue(result.cursor)
entries.extend(result.entries)
audio_list_cache[req.scripture_name] = {
"entries": entries,
"expiry": now + AUDIO_LIST_CACHE_TTL,
}
except dropbox.exceptions.ApiError:
raise HTTPException(status_code=404, detail="Audio directory not found")
# --- 2️⃣ Filter matching files ---
matching_files = [
entry
for entry in entries
if isinstance(entry, FileMetadata) and entry.name.startswith(prefix)
]
if not matching_files:
raise HTTPException(status_code=404, detail="No audio files found")
# --- 3️⃣ Generate or reuse cached URLs ---
for entry in matching_files:
filename = entry.name
file_type = filename[len(prefix):].rsplit(".", 1)[0]
cache_key = (req.scripture_name, req.global_index, file_type)
cached = audio_cache.get(cache_key)
if cached and cached["expiry"] > now:
urls[file_type] = cached["url"]
continue
file_path = f"{base_path}/{filename}"
try:
temp_link = dbx.files_get_temporary_link(file_path).link
urls[file_type] = temp_link
audio_cache[cache_key] = {"url": temp_link, "expiry": now + CACHE_TTL}
except dropbox.exceptions.ApiError:
urls[file_type] = None
return urls
async def cleanup_audio_url_cache(interval_seconds: int = 600):
"""Periodically remove expired entries from audio_cache."""
while True:
now = datetime.now(timezone.utc)
expired_keys = [key for key, val in audio_cache.items() if val["expiry"] <= now]
for key in expired_keys:
del audio_cache[key]
# Debug log
if expired_keys:
print(f"Cleaned up {len(expired_keys)} expired cache entries")
await asyncio.sleep(interval_seconds)
from datetime import datetime, timezone, timedelta
# Simple in-memory cache
_audio_indices_cache: dict[tuple[str, str], dict] = {}
CACHE_TTL_2 = timedelta(minutes=10)
async def get_global_indices_with_audio(scripture_name: str, audio_type: AudioType):
"""
Returns a sorted list of global indices for a given scripture that have audio of the specified type.
Supports AudioType.any, AudioType.none, and specific types.
Uses in-memory caching for repeated calls.
"""
now = datetime.now(timezone.utc)
cache_key = (scripture_name, audio_type.value)
# Check cache
cached = _audio_indices_cache.get(cache_key)
if cached and cached["expiry"] > now:
return cached["indices"]
# Step 1: list all files in Dropbox folder
base_path = f"/{scripture_name}/audio"
entries = []
try:
result = dbx.files_list_folder(base_path)
entries.extend(result.entries)
while result.has_more:
result = dbx.files_list_folder_continue(result.cursor)
entries.extend(result.entries)
except dropbox.exceptions.ApiError:
raise HTTPException(status_code=404, detail="Audio directory not found")
# Step 2: collect all global indices with any audio
all_indices_with_audio = set()
for entry in entries:
if not isinstance(entry, FileMetadata) or "-" not in entry.name:
continue
global_index_str, _ = entry.name.split("-", 1)
try:
global_index = int(global_index_str)
except ValueError:
continue
all_indices_with_audio.add(global_index)
# Step 3: filter based on audio_type
if audio_type == AudioType.none:
db = SanatanDatabase()
config = SanatanConfig()
total_verses = db.count(
collection_name=config.get_collection_name(scripture_name=scripture_name)
)
indices = set(range(1, total_verses + 1)) - all_indices_with_audio
elif audio_type == AudioType.any:
indices = all_indices_with_audio
else:
indices = set()
for entry in entries:
if not isinstance(entry, FileMetadata) or "-" not in entry.name:
continue
global_index_str, rest = entry.name.split("-", 1)
try:
global_index = int(global_index_str)
except ValueError:
continue
file_type = rest.rsplit(".", 1)[0].strip().lower()
if file_type.startswith(audio_type.value):
indices.add(global_index)
# Cache the result
_audio_indices_cache[cache_key] = {
"indices": sorted(indices),
"expiry": now + CACHE_TTL_2
}
return sorted(indices)
if __name__ == "__main__":
# Create Dropbox client with your access token
# data = list_dropbox_folder_hierarchy(dbx, "")
# data = asyncio.run(
# get_audio_urls(AudioRequest(scripture_name="divya_prabandham", global_index=0))
# )
data = asyncio.run(
get_global_indices_with_audio(
scripture_name="divya_prabandham", audio_type=AudioType.upanyasam
)
)
# print(json.dumps(data, indent=2))
print(len(data))