Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException | |
| from typing import List, Dict, Any | |
| import os | |
| import base64 | |
| import mimetypes | |
| from app.schemas import ChatRequest, ChatResponse, TranslateRequest, TranslateResponse, UnifiedChatRequest, UnifiedChatResponse, UnifiedContext | |
| from app.utils.security import get_current_user | |
| from app.utils.helpers import medical_disclaimer, emergency_triage | |
| from app.ai_services import chat_completion, translate_text, detect_language | |
| from app.services.cameroon_data import get_cameroon_data | |
| router = APIRouter() | |
| def chat(req: ChatRequest, user=Depends(get_current_user)): | |
| try: | |
| raise HTTPException(status_code=501, detail="Historique de conversation désactivé (pas de base de données)") | |
| # Unreachable since DB disabled | |
| # Build conversation history | |
| history = [{"role": "user", "content": req.text}] | |
| # Get AI response | |
| answer = chat_completion(history, req.language) | |
| answer = f"{answer}\n\n{medical_disclaimer(req.language)}" | |
| return ChatResponse(reply=answer, conversation_id=0) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") | |
| def translate(req: TranslateRequest): | |
| out = translate_text(req.text, req.target_language) | |
| return TranslateResponse(text=out) | |
| # Gateway helpers | |
| async def handle_chat_via_gateway(payload: dict, current_user): | |
| req = ChatRequest(**payload) | |
| if current_user is None: | |
| raise HTTPException(status_code=401, detail="Authentification requise") | |
| return chat(req, user=current_user) # type: ignore | |
| async def handle_translate_via_gateway(payload: dict, current_user): | |
| req = TranslateRequest(**payload) | |
| return translate(req) | |
| async def chat_unified(req: UnifiedChatRequest): | |
| try: | |
| # Step 1 - preprocess by type | |
| processed_text = req.message | |
| detected_lang = req.language or None | |
| if req.message_type == "audio": | |
| from app.ai_services import transcribe_audio | |
| # Auto-detect after transcription if language not specified | |
| processed_text = transcribe_audio(req.message, None) | |
| if processed_text: | |
| detected_lang = detect_language(processed_text) | |
| # If transcription failed, stop here to avoid sending raw audio to GPT | |
| if not processed_text or processed_text.strip() == "": | |
| raise HTTPException(status_code=400, detail="Transcription audio non disponible. Veuillez fournir un audio plus clair.") | |
| elif req.message_type == "image": | |
| from app.ai_services import analyze_image | |
| image_input = req.message | |
| # Support local file paths by converting to data URL | |
| try: | |
| if isinstance(image_input, str) and os.path.exists(image_input): | |
| mime, _ = mimetypes.guess_type(image_input) | |
| mime = mime or "image/jpeg" | |
| with open(image_input, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode("ascii") | |
| image_input = f"data:{mime};base64,{b64}" | |
| except Exception: | |
| # Fallback to original value if any error occurs | |
| pass | |
| processed_text = analyze_image(image_input, "Analyse l'image médicale et décris les signes cliniques pertinents.") | |
| # Detect language from the analysis output if not provided | |
| if not req.language: | |
| detected_lang = detect_language(processed_text) | |
| # Step 2 - Cameroon context analysis | |
| data = get_cameroon_data() | |
| similar = data.search_similar_cases(processed_text, top_k=10) | |
| disease_counts: Dict[str, int] = {} | |
| for r in similar: | |
| if r.diagnosis: | |
| disease_counts[r.diagnosis] = disease_counts.get(r.diagnosis, 0) + 1 | |
| most_probable = max(disease_counts, key=disease_counts.get) if disease_counts else None | |
| if len(similar) >= 7: | |
| confidence = "high" | |
| elif len(similar) >= 3: | |
| confidence = "medium" | |
| else: | |
| confidence = "low" | |
| # Step 3 - General AI call | |
| history_msgs = [] | |
| if req.history: | |
| for m in req.history[-6:]: | |
| history_msgs.append({"role": "user", "content": m}) | |
| history_msgs.append({"role": "user", "content": processed_text}) | |
| # If text input and language not provided, detect it | |
| if req.message_type == "text" and (not req.language): | |
| detected_lang = detect_language(processed_text) | |
| language_to_use = detected_lang or "fr" | |
| reply = chat_completion(history_msgs, language_to_use) | |
| # Enrich with disclaimer | |
| reply = f"{reply}\n\n{medical_disclaimer(language_to_use)}" | |
| # Step 4 - Format response | |
| ctx = UnifiedContext( | |
| similar_cases_found=len(similar), | |
| most_probable_diagnosis=most_probable, | |
| confidence_level=confidence, | |
| advice="Consultez un centre de santé si les symptômes persistent ou s'aggravent." | |
| ) | |
| return UnifiedChatResponse( | |
| response=reply, | |
| context=ctx, | |
| suggested_actions=["Rechercher centres de santé", "En savoir plus"], | |
| language=language_to_use, | |
| ) | |
| except HTTPException as he: | |
| # Propagate intended HTTP errors (e.g., 400 for bad audio transcription) | |
| raise he | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") | |