AI_SQL / mcp_server.py
mgbam's picture
Update mcp_server.py
7bc9e4c verified
raw
history blame
3.19 kB
"""
mcp_server.py – MCP tool server exposing:
β€’ query_database – built-in SQLite demo
β€’ query_hubspot – HubSpot CRM via CData Python connector
"""
from mcp.server.fastmcp import FastMCP
import sqlite3, json, os
from cdata.hubspot import connect # pip install cdata-hubspot
# ─── 1. FastMCP instance ────────────────────────────────────────────────
mcp = FastMCP("EnterpriseData")
# ─── 2. In-memory SQLite sample ─────────────────────────────────────────
conn = sqlite3.connect(":memory:", check_same_thread=False)
cur = conn.cursor()
cur.execute("""
CREATE TABLE Customers (
CustomerID INTEGER PRIMARY KEY AUTOINCREMENT,
Name TEXT,
Region TEXT,
LastOrderDate TEXT
)
""")
cur.executemany(
"INSERT INTO Customers (Name, Region, LastOrderDate) VALUES (?,?,?)",
[
("Acme Corp", "Northeast", "2024-12-01"),
("Beta Inc", "West", "2025-06-01"),
("Gamma Co", "Northeast", "2023-09-15"),
("Delta LLC", "South", "2025-03-20"),
("Epsilon Ltd","Northeast", "2025-07-10"),
],
)
conn.commit()
# ─── 3. SQLite tool ─────────────────────────────────────────────────────
@mcp.tool()
def query_database(sql: str) -> str:
"""Run SQL against the in-memory Customers table and return JSON rows."""
try:
cur.execute(sql)
cols = [d[0] for d in cur.description or []]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
return json.dumps(rows)
except Exception as exc:
return json.dumps({"error": str(exc)})
# ─── 4. HubSpot connector setup ─────────────────────────────────────────
# Uses CData’s HubSpot Python connector :contentReference[oaicite:4]{index=4}
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
hs_conn = connect(PrivateAppToken=HUBSPOT_TOKEN)
# ─── 5. HubSpot tool ────────────────────────────────────────────────────
@mcp.tool()
def query_hubspot(sql: str) -> str:
"""
Execute SQL/ODBC-style queries on HubSpot objects (Contacts, Deals, etc.).
Example:
SELECT firstname, lastname, email
FROM contacts
WHERE lifecyclestage = 'lead';
"""
try:
cur_hs = hs_conn.cursor()
cur_hs.execute(sql)
cols = [d[0] for d in cur_hs.description or []]
rows = [dict(zip(cols, r)) for r in cur_hs.fetchall()]
return json.dumps(rows)
except Exception as exc:
return json.dumps({"error": str(exc)})
# ─── 6. Run the server ───────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")