news_sentiment_analyzer / src /telegram_bot.py
Dmitry Beresnev
fix tg bot
541b3c5
raw
history blame
1.52 kB
import logging
import os
import sys
from typing import Any
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
from dotenv import load_dotenv
from src.financial_news_requester import fetch_comp_financial_news
load_dotenv()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str:
message = ""
for item in news_json:
message += (
f"πŸ“° <b>{item.get('headline')}</b>\n"
f"πŸ“ {item.get('summary')}\n"
f"🏷️ Source: {item.get('source')}\n"
f"πŸ”— <a href=\"{item.get('url')}\">Read more</a>\n\n"
)
return message
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Hello! I'm your Financial Bot.")
async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Running ...")
try:
feed = fetch_comp_financial_news()
logging.info(f"processed: {feed} news")
await update.message.reply_text(f"Result:\n{format_news_for_telegram(feed)}", parse_mode='HTML')
except Exception as e:
await update.message.reply_text(f"Error: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("run", run_crew))
app.run_polling()