news_sentiment_analyzer / src /telegram_bot.py
Dmitry Beresnev
add news and tg bot
67cf156
raw
history blame
1.51 kB
import logging
import os
from typing import Any
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
from dotenv import load_dotenv
from src.financial_news_requester import fetch_comp_financial_news
load_dotenv()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str:
message = ""
for item in news_json:
message += (
f"πŸ“° <b>{item.get('headline')}</b>\n"
f"πŸ“ {item.get('summary')}\n"
f"🏷️ Source: {item.get('source')}\n"
f"πŸ”— <a href=\"{item.get('url')}\">Read more</a>\n\n"
)
return message
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Hello! I'm your Financial Bot.")
async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Running ...")
try:
feed = fetch_comp_financial_news()
logging.info(f"processed: {feed} news")
await update.message.reply_text(f"Result:\n{format_news_for_telegram(feed)}", parse_mode='HTML')
except Exception as e:
await update.message.reply_text(f"Error: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("run", run_crew))
app.run_polling()