AI_SQL / connectors /hubspot_connector.py
mgbam's picture
Update connectors/hubspot_connector.py
5c279ec verified
raw
history blame
1.09 kB
# connectors/hubspot_connector.py
import os
import json
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException as HSContactsError
# Read HubSpot Private App Token
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
client = HubSpot(access_token=HUBSPOT_TOKEN)
def list_contacts(limit=100):
try:
results = client.crm.contacts.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except HSContactsError as he:
return json.dumps({"error": he.body})
except Exception as e:
return json.dumps({"error": str(e)})
def list_companies(limit=100):
try:
results = client.crm.companies.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except Exception as e:
return json.dumps({"error": str(e)})
def list_deals(limit=100):
try:
results = client.crm.deals.basic_api.get_page(limit=limit).results
return json.dumps([r.to_dict() for r in results])
except Exception as e:
return json.dumps({"error": str(e)})