sanatan_ai / modules /dropbox /discources.py
vikramvasudevan's picture
Upload folder using huggingface_hub
064643c verified
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import List, Optional
import dropbox
from modules.dropbox.client import dbx
# Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Cache: key = folder_path, value = {"timestamp": datetime, "data": List[dict]}
_discourse_cache: dict[str, dict] = {}
CACHE_TTL = timedelta(hours=1)
FOLDER_PATH = "/_discourses"
async def fetch_discourses_from_dropbox() -> List[dict]:
"""
Fetch all discourse JSONs for a scripture from Dropbox with caching.
Expects files in "/_discourses/".
"""
loop = asyncio.get_running_loop()
folder_path = FOLDER_PATH
# Check cache
cache_entry = _discourse_cache.get(folder_path)
if cache_entry:
age = datetime.now() - cache_entry["timestamp"]
if age < CACHE_TTL:
logger.info(f"Using cached discourses for '{folder_path}' (age={age})")
return cache_entry["data"]
logger.info(f"Fetching discourses from Dropbox folder '{folder_path}'")
discourses: List[dict] = []
try:
# List folder contents (synchronously in executor)
res = await loop.run_in_executor(None, dbx.files_list_folder, folder_path)
for entry in res.entries:
if isinstance(entry, dropbox.files.FileMetadata) and entry.name.lower().endswith(".json"):
metadata, fres = await loop.run_in_executor(
None, dbx.files_download, f"{folder_path}/{entry.name}"
)
data = fres.content.decode("utf-8")
discourses.append(json.loads(data))
# Update cache
_discourse_cache[folder_path] = {"timestamp": datetime.now(), "data": discourses}
logger.info(f"Cached {len(discourses)} discourses for '{folder_path}'")
return discourses
except Exception as e:
logger.error(f"Error fetching discourses from '{folder_path}'", exc_info=e)
# fallback to cached data if available
if cache_entry:
logger.warning(f"Returning stale cached discourses for '{folder_path}'")
return cache_entry["data"]
else:
logger.warning(f"No cached discourses available for '{folder_path}'")
return []
async def get_discourse_summaries(page: int = 1, per_page: int = 10):
"""
Returns paginated summaries: id, topic_name, thumbnail_url.
Sorted by topic_name.
"""
all_discourses = await fetch_discourses_from_dropbox()
# Build summaries
summaries = [
{
"id": d.get("id"),
"topic_name": d.get("topic_name"),
"thumbnail_url": d.get("thumbnail_url"),
}
for d in all_discourses
]
summaries.sort(key=lambda x: (x.get("topic_name") or "").lower())
# Pagination
total_items = len(summaries)
total_pages = (total_items + per_page - 1) // per_page
if page < 1 or page > total_pages:
logger.warning(f"Invalid page {page}. Must be between 1 and {total_pages}")
return {"page": page, "per_page": per_page, "total_pages": total_pages, "total_items": total_items, "data": []}
start = (page - 1) * per_page
end = start + per_page
paginated = summaries[start:end]
return {
"page": page,
"per_page": per_page,
"total_pages": total_pages,
"total_items": total_items,
"data": paginated,
}
async def get_discourse_by_id(topic_id: int) -> Optional[dict]:
"""
Fetch a single discourse JSON by topic_id from Dropbox.
Uses in-memory caching per file.
"""
loop = asyncio.get_running_loop()
file_path = f"{FOLDER_PATH}/{topic_id}.json"
# Check cache
cache_entry = _discourse_cache.get(file_path)
if cache_entry:
age = datetime.now() - cache_entry["timestamp"]
if age < CACHE_TTL:
logger.info(f"Using cached discourse for topic {topic_id} (age={age})")
return cache_entry["data"]
try:
logger.info(f"Fetching discourse {topic_id} from Dropbox: {file_path}")
metadata, res = await loop.run_in_executor(None, dbx.files_download, file_path)
data = res.content.decode("utf-8")
discourse = json.loads(data)
# Update cache
_discourse_cache[file_path] = {"timestamp": datetime.now(), "data": discourse}
return discourse
except dropbox.exceptions.HttpError as e:
logger.error(f"Dropbox file not found: {file_path}", exc_info=e)
return None
except Exception as e:
logger.error(f"Error fetching discourse {topic_id}", exc_info=e)
# fallback to cached data if available
if cache_entry:
logger.warning(f"Returning stale cached discourse for topic {topic_id}")
return cache_entry["data"]
return None