Spaces:
Runtime error
Runtime error
Silicon Valley - Admin
Refactor server.py to replace ProxyFix with ProxyHeadersMiddleware for improved proxy handling
ed02a4f
| # server.py | |
| import asyncio | |
| import importlib.metadata | |
| import json | |
| import logging | |
| import secrets | |
| import uuid | |
| from dataclasses import dataclass, asdict | |
| from typing import Any, AsyncGenerator, Dict, Tuple, Union | |
| from quart import Quart, websocket, request, send_from_directory | |
| from quart_schema import QuartSchema, validate_request, validate_response | |
| from starlette.middleware.proxy_headers import ProxyHeadersMiddleware # Importar ProxyHeadersMiddleware de Starlette | |
| # Configuraciones | |
| TIMEOUT: int = 40 | |
| LOG_LEVEL: int = logging.DEBUG | |
| TRUSTED_HOSTS: list[str] = ["127.0.0.1", "172.18.0.3"] | |
| # Inicializaci贸n de la aplicaci贸n Quart | |
| app = Quart(__name__) | |
| QuartSchema(app) | |
| app.asgi_app = ProxyHeadersMiddleware( | |
| app.asgi_app, | |
| x_for=1, | |
| x_proto=1, | |
| x_host=1, | |
| x_port=1, | |
| x_prefix=1 | |
| ) | |
| app.logger.setLevel(LOG_LEVEL) | |
| # Excepciones personalizadas | |
| class SessionDoesNotExist(Exception): | |
| """Error al solicitar un ID de sesi贸n que no existe.""" | |
| pass | |
| class SessionAlreadyExists(Exception): | |
| """Error al crear una sesi贸n con un ID que ya existe.""" | |
| pass | |
| class ClientError(Exception): | |
| """Error cuando el cliente devuelve un error.""" | |
| def __init__(self, message: str): | |
| super().__init__(message) | |
| self.message = message | |
| # Clases de datos para solicitudes y respuestas | |
| class ClientRequest: | |
| request_id: str | |
| data: Any | |
| class ClientResponse: | |
| request_id: str | |
| error: bool | |
| data: Any | |
| class Status: | |
| status: str | |
| version: str | |
| class Session: | |
| session_id: str | |
| class Command: | |
| session_id: str | |
| command: str | |
| class Read: | |
| session_id: str | |
| path: str | |
| class Write: | |
| session_id: str | |
| path: str | |
| content: str | |
| class CommandResponse: | |
| return_code: int | |
| stdout: str | |
| stderr: str | |
| class ReadResponse: | |
| content: str | |
| class WriteResponse: | |
| size: int | |
| class ErrorResponse: | |
| error: str | |
| # Broker para manejar sesiones y comunicaciones | |
| class SessionBroker: | |
| def __init__(self): | |
| """Diccionario de session_id -> cola de mensajes pendientes por enviar al cliente""" | |
| self.sessions: Dict[str, asyncio.Queue] = {} | |
| """Diccionario de (session_id, request_id) -> futuro de la respuesta del cliente""" | |
| self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {} | |
| async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any: | |
| if session_id not in self.sessions: | |
| raise SessionDoesNotExist() | |
| request_id = str(uuid.uuid4()) | |
| loop = asyncio.get_event_loop() | |
| future = loop.create_future() | |
| self.pending_responses[(session_id, request_id)] = future | |
| await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data)) | |
| try: | |
| return await asyncio.wait_for(future, timeout) | |
| except asyncio.TimeoutError: | |
| raise | |
| finally: | |
| self.pending_responses.pop((session_id, request_id), None) | |
| async def receive_response(self, session_id: str, response: ClientResponse) -> None: | |
| key = (session_id, response.request_id) | |
| future = self.pending_responses.pop(key, None) | |
| if future and not future.done(): | |
| if response.error: | |
| future.set_exception(ClientError(message=response.data)) | |
| else: | |
| future.set_result(response.data) | |
| async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]: | |
| if session_id in self.sessions: | |
| raise SessionAlreadyExists() | |
| queue = asyncio.Queue() | |
| self.sessions[session_id] = queue | |
| try: | |
| while True: | |
| yield await queue.get() | |
| finally: | |
| del self.sessions[session_id] | |
| # Eliminar todas las respuestas pendientes de esta sesi贸n | |
| keys_to_remove = [key for key in self.pending_responses if key[0] == session_id] | |
| for key in keys_to_remove: | |
| future = self.pending_responses.pop(key) | |
| if not future.done(): | |
| future.set_exception(SessionDoesNotExist()) | |
| # Instanciaci贸n del broker | |
| broker = SessionBroker() | |
| # Funciones y rutas de la API | |
| async def _receive(session_id: str) -> None: | |
| while True: | |
| try: | |
| message = await websocket.receive() | |
| response_data = json.loads(message) | |
| client_response = ClientResponse(**response_data) | |
| app.logger.info(f"{websocket.remote_addr} - RESPONSE - {session_id} - {json.dumps(asdict(client_response))}") | |
| await broker.receive_response(session_id, client_response) | |
| except Exception as e: | |
| app.logger.error(f"Error al recibir respuesta: {e}") | |
| break | |
| async def status() -> Status: | |
| try: | |
| version = importlib.metadata.version('serverwitch-api') | |
| except importlib.metadata.PackageNotFoundError: | |
| version = "unknown" | |
| return Status(status="OK", version=version) | |
| async def session_handler(): | |
| session_id = secrets.token_hex() | |
| app.logger.info(f"{websocket.remote_addr} - NEW SESSION - {session_id}") | |
| session_message = Session(session_id=session_id) | |
| await websocket.send(json.dumps(asdict(session_message))) | |
| task = asyncio.create_task(_receive(session_id)) | |
| try: | |
| async for request_data in broker.subscribe(session_id): | |
| app.logger.info(f"{websocket.remote_addr} - REQUEST - {session_id} - {json.dumps(asdict(request_data))}") | |
| await websocket.send(json.dumps(asdict(request_data))) | |
| except SessionAlreadyExists: | |
| error_response = ErrorResponse(error="Session already exists.") | |
| await websocket.send(json.dumps(asdict(error_response))) | |
| finally: | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| async def command(data: Command) -> Tuple[Union[CommandResponse, ErrorResponse], int]: | |
| try: | |
| response_data = await broker.send_request( | |
| data.session_id, | |
| {'action': 'command', 'command': data.command}, | |
| timeout=TIMEOUT | |
| ) | |
| response = CommandResponse(**response_data) | |
| return response, 200 | |
| except SessionDoesNotExist: | |
| app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
| return ErrorResponse(error='Session does not exist.'), 500 | |
| except ClientError as e: | |
| return ErrorResponse(error=e.message), 500 | |
| except asyncio.TimeoutError: | |
| return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
| async def read(data: Read) -> Tuple[Union[ReadResponse, ErrorResponse], int]: | |
| try: | |
| response_data = await broker.send_request( | |
| data.session_id, | |
| {'action': 'read', 'path': data.path}, | |
| timeout=TIMEOUT | |
| ) | |
| response = ReadResponse(**response_data) | |
| return response, 200 | |
| except SessionDoesNotExist: | |
| app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
| return ErrorResponse(error='Session does not exist.'), 500 | |
| except ClientError as e: | |
| return ErrorResponse(error=e.message), 500 | |
| except asyncio.TimeoutError: | |
| return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
| async def write(data: Write) -> Tuple[Union[WriteResponse, ErrorResponse], int]: | |
| try: | |
| response_data = await broker.send_request( | |
| data.session_id, | |
| {'action': 'write', 'path': data.path, 'content': data.content}, | |
| timeout=TIMEOUT | |
| ) | |
| response = WriteResponse(**response_data) | |
| return response, 200 | |
| except SessionDoesNotExist: | |
| app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
| return ErrorResponse(error='Session does not exist.'), 500 | |
| except ClientError as e: | |
| return ErrorResponse(error=e.message), 500 | |
| except asyncio.TimeoutError: | |
| return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
| # Rutas para servir archivos est谩ticos y OpenAPI | |
| async def send_static(path): | |
| return await send_from_directory('static', path) | |
| async def openapi_spec(): | |
| return await send_from_directory('.', 'openapi.yaml') | |
| # No se necesita ejecutar nada aqu铆, ya que Hypercorn se encargar谩 de iniciar la aplicaci贸n |