AI_SQL / connectors /hubspot_connector.py
mgbam's picture
Update connectors/hubspot_connector.py
defb2ee verified
raw
history blame
3.19 kB
import os
import json
import sqlite3
from mcp.server.fastmcp import FastMCP
# HubSpot
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException as HSContactsError
# ────────────────────────────────────────────────────────────────────────────
mcp = FastMCP("EnterpriseData")
# ─── 1) In-memory SQLite sample ─────────────────────────────────────────────
conn = sqlite3.connect(":memory:", check_same_thread=False)
cur = conn.cursor()
cur.execute("""
CREATE TABLE Customers (
CustomerID INTEGER PRIMARY KEY AUTOINCREMENT,
Name TEXT,
Region TEXT,
LastOrderDate TEXT
)
""")
cur.executemany(
"INSERT INTO Customers (Name, Region, LastOrderDate) VALUES (?,?,?)",
[
("Acme Corp", "Northeast", "2024-12-01"),
("Beta Inc", "West", "2025-06-01"),
("Gamma Co", "Northeast", "2023-09-15"),
("Delta LLC", "South", "2025-03-20"),
("Epsilon Ltd","Northeast", "2025-07-10"),
],
)
conn.commit()
@mcp.tool()
def query_database(sql: str) -> str:
"""Run SQL against the in-memory Customers table and return JSON rows."""
try:
cur.execute(sql)
cols = [d[0] for d in cur.description or []]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
return json.dumps(rows)
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 2) HubSpot tools ───────────────────────────────────────────────────────
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
hs_client = HubSpot(access_token=HUBSPOT_TOKEN)
@mcp.tool()
def query_hubspot_contacts(limit: int = 100) -> str:
"""List HubSpot contacts."""
try:
results = hs_client.crm.contacts.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except HSContactsError as he:
return json.dumps({"error": he.body})
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def query_hubspot_companies(limit: int = 100) -> str:
"""List HubSpot companies."""
try:
results = hs_client.crm.companies.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def query_hubspot_deals(limit: int = 100) -> str:
"""List HubSpot deals."""
try:
results = hs_client.crm.deals.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except Exception as e:
return json.dumps({"error": str(e)})
# ─── Final: Start the MCP server ────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")