|
|
"""
|
|
|
REST API for Portfolio Optimization
|
|
|
|
|
|
This module provides HTTP endpoints for the portfolio optimization quickstart:
|
|
|
|
|
|
Endpoints:
|
|
|
- GET /demo-data - List available demo datasets
|
|
|
- GET /demo-data/{id} - Load a specific demo dataset
|
|
|
- POST /portfolios - Submit a portfolio for optimization
|
|
|
- GET /portfolios/{id} - Get current solution for a job
|
|
|
- GET /portfolios/{id}/status - Get solving status
|
|
|
- DELETE /portfolios/{id} - Stop solving
|
|
|
- PUT /portfolios/analyze - Analyze a submitted portfolio's score
|
|
|
|
|
|
The API follows the same patterns as other SolverForge quickstarts.
|
|
|
"""
|
|
|
from fastapi import FastAPI, Request
|
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
from uuid import uuid4
|
|
|
from dataclasses import replace
|
|
|
from typing import Any
|
|
|
|
|
|
from solverforge_legacy.solver import SolverManager, SolverFactory
|
|
|
|
|
|
from .domain import PortfolioOptimizationPlan, PortfolioOptimizationPlanModel
|
|
|
from .converters import plan_to_model, model_to_plan
|
|
|
from .demo_data import DemoData, generate_demo_data
|
|
|
from .solver import solver_manager, solution_manager, create_solver_config
|
|
|
from .score_analysis import ConstraintAnalysisDTO, MatchAnalysisDTO
|
|
|
|
|
|
|
|
|
app = FastAPI(
|
|
|
title="Portfolio Optimization Quickstart",
|
|
|
description="SolverForge quickstart for stock portfolio optimization",
|
|
|
docs_url='/q/swagger-ui'
|
|
|
)
|
|
|
|
|
|
|
|
|
data_sets: dict[str, PortfolioOptimizationPlan] = {}
|
|
|
solver_managers: dict[str, SolverManager] = {}
|
|
|
|
|
|
|
|
|
@app.get("/demo-data")
|
|
|
async def demo_data_list() -> list[DemoData]:
|
|
|
"""List available demo datasets."""
|
|
|
return [e for e in DemoData]
|
|
|
|
|
|
|
|
|
@app.get("/demo-data/{dataset_id}", response_model_exclude_none=True)
|
|
|
async def get_demo_data(dataset_id: str) -> PortfolioOptimizationPlanModel:
|
|
|
"""Load a specific demo dataset."""
|
|
|
demo_data = getattr(DemoData, dataset_id)
|
|
|
domain_plan = generate_demo_data(demo_data)
|
|
|
return plan_to_model(domain_plan)
|
|
|
|
|
|
|
|
|
@app.get("/portfolios/{problem_id}", response_model_exclude_none=True)
|
|
|
async def get_portfolio(problem_id: str) -> PortfolioOptimizationPlanModel:
|
|
|
"""Get current solution for a portfolio optimization job."""
|
|
|
plan = data_sets[problem_id]
|
|
|
|
|
|
manager = solver_managers.get(problem_id, solver_manager)
|
|
|
updated_plan = replace(plan, solver_status=manager.get_solver_status(problem_id))
|
|
|
return plan_to_model(updated_plan)
|
|
|
|
|
|
|
|
|
def update_portfolio(problem_id: str, plan: PortfolioOptimizationPlan) -> None:
|
|
|
"""Callback to update the stored solution as solver improves it."""
|
|
|
global data_sets
|
|
|
data_sets[problem_id] = plan
|
|
|
|
|
|
|
|
|
@app.post("/portfolios")
|
|
|
async def solve_portfolio(plan_model: PortfolioOptimizationPlanModel) -> str:
|
|
|
"""
|
|
|
Submit a portfolio for optimization.
|
|
|
|
|
|
Returns a job ID that can be used to retrieve the solution.
|
|
|
Supports custom solver configuration via solverConfig field.
|
|
|
"""
|
|
|
job_id = str(uuid4())
|
|
|
plan = model_to_plan(plan_model)
|
|
|
data_sets[job_id] = plan
|
|
|
|
|
|
|
|
|
termination_seconds = 30
|
|
|
if plan_model.solver_config and plan_model.solver_config.termination_seconds:
|
|
|
termination_seconds = plan_model.solver_config.termination_seconds
|
|
|
|
|
|
|
|
|
config = create_solver_config(termination_seconds)
|
|
|
manager: SolverManager = SolverManager.create(SolverFactory.create(config))
|
|
|
solver_managers[job_id] = manager
|
|
|
|
|
|
manager.solve_and_listen(
|
|
|
job_id,
|
|
|
plan,
|
|
|
lambda solution: update_portfolio(job_id, solution)
|
|
|
)
|
|
|
return job_id
|
|
|
|
|
|
|
|
|
@app.get("/portfolios")
|
|
|
async def list_portfolios() -> list[str]:
|
|
|
"""List all job IDs of submitted portfolios."""
|
|
|
return list(data_sets.keys())
|
|
|
|
|
|
|
|
|
@app.get("/portfolios/{problem_id}/status")
|
|
|
async def get_status(problem_id: str) -> dict[str, Any]:
|
|
|
"""Get the portfolio status and score for a given job ID."""
|
|
|
if problem_id not in data_sets:
|
|
|
raise ValueError(f"No portfolio found with ID {problem_id}")
|
|
|
|
|
|
plan = data_sets[problem_id]
|
|
|
|
|
|
manager = solver_managers.get(problem_id, solver_manager)
|
|
|
solver_status = manager.get_solver_status(problem_id)
|
|
|
|
|
|
|
|
|
selected_count = plan.get_selected_count()
|
|
|
expected_return = plan.get_expected_return() if selected_count > 0 else 0
|
|
|
|
|
|
return {
|
|
|
"score": {
|
|
|
"hardScore": plan.score.hard_score if plan.score else 0,
|
|
|
"softScore": plan.score.soft_score if plan.score else 0,
|
|
|
},
|
|
|
"solverStatus": solver_status.name,
|
|
|
"selectedCount": selected_count,
|
|
|
"expectedReturn": expected_return,
|
|
|
"sectorWeights": plan.get_sector_weights() if selected_count > 0 else {},
|
|
|
}
|
|
|
|
|
|
|
|
|
@app.delete("/portfolios/{problem_id}")
|
|
|
async def stop_solving(problem_id: str) -> PortfolioOptimizationPlanModel:
|
|
|
"""Terminate solving for a given job ID."""
|
|
|
if problem_id not in data_sets:
|
|
|
raise ValueError(f"No portfolio found with ID {problem_id}")
|
|
|
|
|
|
|
|
|
manager = solver_managers.get(problem_id, solver_manager)
|
|
|
try:
|
|
|
manager.terminate_early(problem_id)
|
|
|
except Exception as e:
|
|
|
print(f"Warning: terminate_early failed for {problem_id}: {e}")
|
|
|
|
|
|
return await get_portfolio(problem_id)
|
|
|
|
|
|
|
|
|
@app.put("/portfolios/analyze")
|
|
|
async def analyze_portfolio(request: Request) -> dict[str, Any]:
|
|
|
"""Submit a portfolio to analyze its score in detail."""
|
|
|
json_data = await request.json()
|
|
|
|
|
|
|
|
|
plan_model = PortfolioOptimizationPlanModel.model_validate(json_data)
|
|
|
|
|
|
|
|
|
domain_plan = model_to_plan(plan_model)
|
|
|
|
|
|
analysis = solution_manager.analyze(domain_plan)
|
|
|
|
|
|
|
|
|
constraints = []
|
|
|
for constraint in getattr(analysis, 'constraint_analyses', []) or []:
|
|
|
matches = [
|
|
|
MatchAnalysisDTO(
|
|
|
name=str(getattr(getattr(match, 'constraint_ref', None), 'constraint_name', "")),
|
|
|
score=str(getattr(match, 'score', "0hard/0soft")),
|
|
|
justification=str(getattr(match, 'justification', "")),
|
|
|
)
|
|
|
for match in getattr(constraint, 'matches', []) or []
|
|
|
]
|
|
|
|
|
|
constraint_dto = ConstraintAnalysisDTO(
|
|
|
name=str(getattr(constraint, 'constraint_name', "")),
|
|
|
weight=str(getattr(constraint, 'weight', "0hard/0soft")),
|
|
|
score=str(getattr(constraint, 'score', "0hard/0soft")),
|
|
|
matches=matches,
|
|
|
)
|
|
|
constraints.append(constraint_dto)
|
|
|
|
|
|
return {"constraints": [constraint.model_dump() for constraint in constraints]}
|
|
|
|
|
|
|
|
|
|
|
|
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|
|
|
|