Spaces:
Runtime error
Runtime error
File size: 1,595 Bytes
67cf156 b6e22da 67cf156 b6e22da 67cf156 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
import logging
import os
import sys
from typing import Any
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
from dotenv import load_dotenv
from src.financial_news_requester import fetch_comp_financial_news
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
load_dotenv()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str:
message = ""
for item in news_json:
message += (
f"π° <b>{item.get('headline')}</b>\n"
f"π {item.get('summary')}\n"
f"π·οΈ Source: {item.get('source')}\n"
f"π <a href=\"{item.get('url')}\">Read more</a>\n\n"
)
return message
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Hello! I'm your Financial Bot.")
async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Running ...")
try:
feed = fetch_comp_financial_news()
logging.info(f"processed: {feed} news")
await update.message.reply_text(f"Result:\n{format_news_for_telegram(feed)}", parse_mode='HTML')
except Exception as e:
await update.message.reply_text(f"Error: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("run", run_crew))
app.run_polling()
|