|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import uvicorn |
|
|
import inspect |
|
|
import httpx |
|
|
|
|
|
from mcp.server.fastmcp import FastMCP |
|
|
from starlette.requests import Request |
|
|
from starlette.responses import PlainTextResponse, JSONResponse |
|
|
from starlette.middleware.base import BaseHTTPMiddleware |
|
|
|
|
|
from langchain_community.utilities import SQLDatabase |
|
|
from langchain_community.tools.sql_database.tool import QuerySQLCheckerTool |
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
api_key=os.environ.get('OPENAI_API_KEY', None), |
|
|
base_url=os.environ['OPENAI_BASE_URL'], |
|
|
model='gpt-4o-mini', |
|
|
temperature=0 |
|
|
) |
|
|
|
|
|
|
|
|
class HuggingFaceTokenAuthMiddleware(BaseHTTPMiddleware): |
|
|
async def dispatch(self, request: Request, call_next): |
|
|
|
|
|
if request.url.path == "/": |
|
|
return await call_next(request) |
|
|
|
|
|
auth = request.headers.get("authorization") |
|
|
if not auth or not auth.lower().startswith("bearer "): |
|
|
return PlainTextResponse("Missing or invalid Authorization header (expected Bearer token)", status_code=401) |
|
|
token = auth.split(" ", 1)[1].strip() |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
resp = await client.get( |
|
|
"https://huggingface.co/api/whoami-v2", |
|
|
headers={"Authorization": f"Bearer {token}"} |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
return PlainTextResponse("Invalid or expired Hugging Face token", status_code=401) |
|
|
hf_user_info = resp.json() |
|
|
|
|
|
request.state.hf_user = hf_user_info |
|
|
return await call_next(request) |
|
|
|
|
|
|
|
|
mcp = FastMCP("Credit Card Database Server") |
|
|
tool_registry = [] |
|
|
|
|
|
def register_tool(fn): |
|
|
"""Decorator to register tool metadata and with MCP.""" |
|
|
mcp.tool()(fn) |
|
|
sig = inspect.signature(fn) |
|
|
params = [ |
|
|
{ |
|
|
"name": param.name, |
|
|
"type": str(param.annotation) if param.annotation is not inspect._empty else "Any", |
|
|
"default": param.default if param.default is not inspect._empty else None, |
|
|
} |
|
|
for param in sig.parameters.values() |
|
|
] |
|
|
tool_registry.append({ |
|
|
"name": fn.__name__, |
|
|
"description": fn.__doc__.strip() if fn.__doc__ else "", |
|
|
"parameters": params, |
|
|
}) |
|
|
return fn |
|
|
|
|
|
credit_card_db = SQLDatabase.from_uri(r"sqlite:///data/ccms.db") |
|
|
query_checker_tool = QuerySQLCheckerTool(db=credit_card_db, llm=llm) |
|
|
|
|
|
@mcp.custom_route("/", methods=["GET"]) |
|
|
async def home(request: Request) -> PlainTextResponse: |
|
|
return PlainTextResponse( |
|
|
""" |
|
|
Credit Card Database MCP Server |
|
|
---- |
|
|
Use the following URL to connect with this server |
|
|
https://pgurazada1-credit-card-database-mcp-server.hf.space/mcp/ |
|
|
|
|
|
Access the following URL for a list of tools and their documentation. |
|
|
https://pgurazada1-credit-card-database-mcp-server.hf.space/tools/ |
|
|
""" |
|
|
) |
|
|
|
|
|
@register_tool |
|
|
def sql_db_list_tables(): |
|
|
""" |
|
|
Returns a comma-separated list of table names in the database. |
|
|
""" |
|
|
return credit_card_db.get_usable_table_names() |
|
|
|
|
|
@register_tool |
|
|
def sql_db_schema(table_names: list[str]) -> str: |
|
|
""" |
|
|
Input 'table_names_str' is a comma-separated string of table names. |
|
|
Returns the DDL SQL schema for these tables. |
|
|
""" |
|
|
return credit_card_db.get_table_info(table_names) |
|
|
|
|
|
@register_tool |
|
|
def sql_db_query_checker(query: str) -> str: |
|
|
""" |
|
|
Input 'query' is a SQL query string. |
|
|
Checks if the query is valid. |
|
|
If the query is valid, it returns the original query. |
|
|
If the query is not valid, it returns the corrected query. |
|
|
This tool is used to ensure the query is valid before executing it. |
|
|
""" |
|
|
return query_checker_tool.run(query) |
|
|
|
|
|
@register_tool |
|
|
def sql_db_query(query: str) -> str: |
|
|
""" |
|
|
Input 'query' is a SQL query string. |
|
|
Executes the query (SELECT only) and returns the result. |
|
|
""" |
|
|
return credit_card_db.run(query) |
|
|
|
|
|
@mcp.custom_route("/tools", methods=["GET"]) |
|
|
async def list_tools(request: Request) -> JSONResponse: |
|
|
"""Return all registered tool metadata as JSON.""" |
|
|
return JSONResponse(tool_registry) |
|
|
|
|
|
|
|
|
app = mcp.streamable_http_app() |
|
|
app.add_middleware(HuggingFaceTokenAuthMiddleware) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |