import logging import os import sys from typing import Any from telegram import Update from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes from dotenv import load_dotenv from src.financial_news_requester import fetch_comp_financial_news sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) load_dotenv() TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str: message = "" for item in news_json: message += ( f"📰 {item.get('headline')}\n" f"📝 {item.get('summary')}\n" f"🏷️ Source: {item.get('source')}\n" f"🔗 Read more\n\n" ) return message async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Hello! I'm your Financial Bot.") async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Running ...") try: feed = fetch_comp_financial_news() logging.info(f"processed: {feed} news") await update.message.reply_text(f"Result:\n{format_news_for_telegram(feed)}", parse_mode='HTML') except Exception as e: await update.message.reply_text(f"Error: {e}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) app = ApplicationBuilder().token(TELEGRAM_TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("run", run_crew)) app.run_polling()