| # connectors/hubspot_connector.py | |
| import os | |
| import json | |
| from hubspot import HubSpot | |
| from hubspot.crm.contacts import ApiException as HSContactsError | |
| # Read HubSpot Private App Token | |
| HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") | |
| client = HubSpot(access_token=HUBSPOT_TOKEN) | |
| def list_contacts(limit=100): | |
| try: | |
| results = client.crm.contacts.basic_api.get_page(limit=limit).results | |
| return json.dumps([r.to_dict() for r in results]) | |
| except HSContactsError as he: | |
| return json.dumps({"error": he.body}) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def list_companies(limit=100): | |
| try: | |
| results = client.crm.companies.basic_api.get_page(limit=limit).results | |
| return json.dumps([r.to_dict() for r in results]) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def list_deals(limit=100): | |
| try: | |
| results = client.crm.deals.basic_api.get_page(limit=limit).results | |
| return json.dumps([r.to_dict() for r in results]) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |