Spaces:
Running
Running
Chandima Prabhath
Revert "Fix send_status_image to use correct key for uploaded file URL in response"
87a21e8
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import re | |
| import json | |
| import time | |
| import random | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import PlainTextResponse, JSONResponse | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from polLLM import generate_llm | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") | |
| # Env vars | |
| GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL") | |
| GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| BOT_STATUS_CHAT = "120363312903494448@g.us" # Chat ID for system messages | |
| image_dir = "/tmp/images" | |
| audio_dir = "/tmp/audio" | |
| # Ensure necessary directories exist | |
| os.makedirs(image_dir, exist_ok=True) | |
| os.makedirs(audio_dir, exist_ok=True) | |
| if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
| raise ValueError("Environment variables are not set properly") | |
| # Queues & inβmemory stores | |
| task_queue = queue.Queue() | |
| trivia_store = {} # chat_id β {"question":β¦, "answer":β¦} | |
| polls = {} # chat_id β {"question":β¦, "options": [β¦], "votes": {1: 0, ...}, "voters": {jid: opt}} | |
| app = FastAPI() | |
| # Global inactivity tracker | |
| last_message_time = time.time() | |
| # --- Inactivity Monitor --- | |
| def inactivity_monitor(): | |
| global last_message_time | |
| while True: | |
| time.sleep(60) # check every minute | |
| if time.time() - last_message_time >= 300: # 5 minutes inactivity | |
| if BOT_STATUS_CHAT: | |
| reminder = "β° I haven't heard from you in a while! I'm still here if you need anything." | |
| send_message("inactivity", BOT_STATUS_CHAT, reminder) | |
| last_message_time = time.time() | |
| threading.Thread(target=inactivity_monitor, daemon=True).start() | |
| # --- Background Worker --- | |
| def worker(): | |
| while True: | |
| task = task_queue.get() | |
| try: | |
| typ = task["type"] | |
| mid = task["message_id"] | |
| cid = task["chat_id"] | |
| if typ == "image": | |
| handle_image_generation(mid, cid, task["prompt"]) | |
| elif typ == "audio": | |
| response_audio(mid, cid, task["prompt"]) | |
| except Exception as e: | |
| logging.error(f"Error processing {task}: {e}") | |
| finally: | |
| task_queue.task_done() | |
| threading.Thread(target=worker, daemon=True).start() | |
| # --- send helpers --- | |
| def send_message_to_chat(to_number, message, retries=3): | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "message": message} | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, json=payload) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries - 1: | |
| logging.error("send_message_to_chat failed: %s", str(e)) | |
| return {"error": str(e)} | |
| def send_message(message_id, to_number, message, retries=3): | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "message": message, "quotedMessageId": message_id} | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, json=payload) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries - 1: | |
| return {"error": str(e)} | |
| def send_image(message_id, to_number, image_path, caption="Here you go!", retries=3): | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "caption": caption, "quotedMessageId": message_id} | |
| files = [("file", ("image.jpg", open(image_path, "rb"), "image/jpeg"))] | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, data=payload, files=files) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries - 1: | |
| return {"error": str(e)} | |
| def send_audio(message_id, to_number, audio_path, retries=3): | |
| logging.debug("send_audio") | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| if not os.path.exists(audio_path): | |
| logging.debug(f"Missing audio: {audio_path}") | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "caption": "Here is your voice reply!", "quotedMessageId": message_id} | |
| try: | |
| with open(audio_path, "rb") as f: | |
| files = [("file", ("audio.mp3", f, "audio/mpeg"))] | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, data=payload, files=files) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries - 1: | |
| return {"error": str(e)} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # --- New helper: send WhatsApp status image --- | |
| def send_status_image(image_path, caption="Status Update", retries=3): | |
| """ | |
| Uploads an image file using Green API's uploadFile endpoint to obtain a URL, | |
| then sends the WhatsApp status update with that URL using sendMediaStatus. | |
| """ | |
| # Step 1: Upload the image to get a file URL | |
| upload_url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/uploadFile/{GREEN_API_TOKEN}" | |
| logging.debug("Uploading image to Green API: %s", upload_url) | |
| try: | |
| with open(image_path, "rb") as f: | |
| files = {"file": f} | |
| upload_response = requests.post(upload_url, files=files) | |
| upload_response.raise_for_status() | |
| upload_data = upload_response.json() | |
| # Adjust key as per API response. Here we try "url" or "fileUrl". | |
| file_url = upload_data.get("url") or upload_data.get("fileUrl") | |
| if not file_url: | |
| logging.error("Upload failed: no file URL returned.") | |
| return {"error": "No file URL returned from upload"} | |
| logging.info("Image uploaded successfully. URL: %s", file_url) | |
| except Exception as e: | |
| logging.error("Error uploading image: %s", e) | |
| return {"error": str(e)} | |
| # Step 2: Use the file URL to send the status update | |
| status_url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMediaStatus/{GREEN_API_TOKEN}" | |
| payload = { | |
| "urlFile": file_url, | |
| "caption": caption, | |
| "fileName": os.path.basename(image_path) | |
| } | |
| for i in range(retries): | |
| try: | |
| r = requests.post(status_url, json=payload) | |
| r.raise_for_status() | |
| logging.info("Status image sent successfully.") | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries - 1: | |
| logging.error("send_status_image failed: %s", str(e)) | |
| return {"error": str(e)} | |
| # --- core response functions --- | |
| def response_text(message_id, chat_id, prompt): | |
| try: | |
| msg = generate_llm(prompt) | |
| send_message(message_id, chat_id, msg) | |
| except Exception: | |
| send_message(message_id, chat_id, "Error processing your request.") | |
| def response_audio(message_id, chat_id, prompt): | |
| logging.debug("response_audio prompt=%s", prompt) | |
| try: | |
| result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
| if result and result[0]: | |
| audio_path, _ = result | |
| send_audio(message_id, chat_id, audio_path) | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| else: | |
| response_text(message_id, chat_id, prompt) | |
| except Exception as e: | |
| logging.debug("audio error: %s", e) | |
| send_message(message_id, chat_id, "Error generating audio. Try again later.") | |
| def handle_image_generation(message_id, chat_id, prompt): | |
| for i in range(4): | |
| try: | |
| img, path, ret_prompt, url = generate_image(prompt, message_id, message_id, image_dir) | |
| if img: | |
| formatted_ret_prompt = "\n\n".join( | |
| f"_{paragraph.strip()}_" for paragraph in ret_prompt.split("\n\n") if paragraph.strip() | |
| ) | |
| send_image( | |
| message_id, | |
| chat_id, | |
| path, | |
| caption=f"β¨ Image ready: {url}\n>{chr(8203)} {formatted_ret_prompt}" | |
| ) | |
| else: | |
| send_message(message_id, chat_id, "Image generation failed.") | |
| break # exit on success | |
| except Exception as e: | |
| logging.error("Error in handle_image_generation: %s", e) | |
| send_message(message_id, chat_id, "Error generating image.") | |
| # --- New background thread: WhatsApp Status Updater --- | |
| def whatsapp_status_updater(): | |
| """ | |
| Every 2 minutes, this function uses the LLM to generate a random creative image prompt, | |
| generates an image from it, uploads the image to get a URL, and sends it as a WhatsApp status update. | |
| """ | |
| while True: | |
| try: | |
| # Generate a random image prompt via LLM | |
| random_prompt = generate_llm("Generate a creative and random image prompt for a WhatsApp status update.") | |
| logging.info("Random status prompt: %s", random_prompt) | |
| # Generate image from the prompt; using "status" as dummy identifiers. | |
| img, path, ret_prompt, url = generate_image(random_prompt, "status", "status", image_dir) | |
| if img and os.path.exists(path): | |
| caption = f"Status: {random_prompt}" | |
| send_status_image(path, caption) | |
| os.remove(path) | |
| else: | |
| logging.error("Image generation for status failed.") | |
| except Exception as e: | |
| logging.error("Error in whatsapp_status_updater: %s", e) | |
| # Sleep for 2 minutes | |
| time.sleep(120) | |
| # Start the WhatsApp status updater thread | |
| threading.Thread(target=whatsapp_status_updater, daemon=True).start() | |
| # --- Startup Message --- | |
| def send_startup_message(): | |
| if BOT_STATUS_CHAT: | |
| startup_msg = "π Hi! I'm Eve, your friendly AI assistant. I'm now live and ready to help with images, voice replies, and more!" | |
| resp = send_message_to_chat(BOT_STATUS_CHAT, startup_msg) | |
| if "error" in resp: | |
| logging.error("Startup message failed: %s", resp["error"]) | |
| else: | |
| logging.warning("BOT_STATUS_CHAT is not set; startup message not sent.") | |
| help_text = ( | |
| "π€ *Hi there, I'm Eve!* Here are the commands you can use:\n\n" | |
| "β’ */help* β _Show this help message._\n" | |
| "β’ */summarize <text>* β _Get a quick summary of your text._\n" | |
| "β’ */translate <language>|<text>* β _Translate text to your chosen language._\n" | |
| "β’ */joke* β _Enjoy a random, funny joke._\n" | |
| "β’ */weather <location>* β _Get the current weather for a location._\n" | |
| "β’ */inspire* β _Receive a short inspirational quote._\n" | |
| "β’ */trivia* β _Start a new trivia question._\n" | |
| "β’ */answer [your answer]* β _Reveal the trivia answer or check your answer if provided._\n" | |
| "β’ */meme <text>* β _Generate a fun meme image._\n" | |
| "β’ */poll <Question>|<Option1>|<Option2>|β¦* β _Create a poll._\n" | |
| "β’ */results* β _See current poll results._\n" | |
| "β’ */endpoll* β _End the poll and show final results._\n" | |
| "β’ */gen <prompt>* β _Generate an image from your prompt._\n\n" | |
| "Send any other text and I'll reply with a voice message. I'm here to help, so don't hesitate to ask!" | |
| ) | |
| # --- Webhook --- | |
| async def whatsapp_webhook(request: Request): | |
| global last_message_time | |
| last_message_time = time.time() | |
| auth = request.headers.get("Authorization", "").strip() | |
| if auth != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
| raise HTTPException(403, "Unauthorized") | |
| try: | |
| data = await request.json() | |
| chat_id = data["senderData"]["chatId"] | |
| print(f"New message from chat ID: {chat_id}") | |
| except: | |
| return JSONResponse({"error": "Invalid JSON"}, status_code=400) | |
| if data.get("typeWebhook") != "incomingMessageReceived": | |
| return {"success": True} | |
| logging.debug("recv: %s", data) | |
| sd = data["senderData"] | |
| chat = sd["chatId"] | |
| mid = data["idMessage"] | |
| sender_jid = sd.get("sender") | |
| md = data.get("messageData", {}) | |
| if md.get("typeMessage") == "quotedMessage" or "quotedMessage" in md: | |
| logging.debug("skip native quotedMessage") | |
| return {"success": True} | |
| if "textMessageData" in md: | |
| body = md["textMessageData"].get("textMessage", "").strip() | |
| ctx = md["textMessageData"].get("contextInfo", {}) | |
| elif "extendedTextMessageData" in md: | |
| body = md["extendedTextMessageData"].get("text", "").strip() | |
| ctx = md["extendedTextMessageData"].get("contextInfo", {}) | |
| else: | |
| return {"success": True} | |
| if ctx.get("mentionedJid") or ctx.get("mentionedJidList"): | |
| return {"success": True} | |
| if chat.endswith("@g.us") and re.search(r"@\d+", body): | |
| return {"success": True} | |
| low = body.lower() | |
| # --- New Commands --- | |
| if low == "/help": | |
| send_message(mid, chat, help_text) | |
| return {"success": True} | |
| if low.startswith("/summarize "): | |
| txt = body[len("/summarize "):].strip() | |
| summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{txt}") | |
| send_message(mid, chat, summary) | |
| return {"success": True} | |
| if low.startswith("/translate "): | |
| part = body[len("/translate "):] | |
| if "|" not in part: | |
| send_message(mid, chat, "Please use `/translate <language>|<text>`") | |
| else: | |
| lang, txt = part.split("|", 1) | |
| resp = generate_llm(f"Translate the following into {lang.strip()}:\n\n{txt.strip()}") | |
| send_message(mid, chat, resp) | |
| return {"success": True} | |
| if low == "/joke": | |
| try: | |
| joke = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| send_message(mid, chat, f"{joke['setup']}\n\n{joke['punchline']}") | |
| except: | |
| send_message(mid, chat, generate_llm("Tell me a short, funny joke.")) | |
| return {"success": True} | |
| if low.startswith("/weather "): | |
| loc = body[len("/weather "):].strip().replace(" ", "+") | |
| try: | |
| w = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
| send_message(mid, chat, w) | |
| except: | |
| send_message(mid, chat, "Could not fetch weather.") | |
| return {"success": True} | |
| if low == "/inspire": | |
| quote = generate_llm("Give me a short inspirational quote.") | |
| send_message(mid, chat, f"β¨ {quote}") | |
| return {"success": True} | |
| # TRIVIA | |
| if low == "/trivia": | |
| randomSeed = random.randint(0, 9999999) | |
| raw = generate_llm( | |
| (f"Generate a unique and random trivia question and answer based on this random seed {randomSeed} to make it unique in JSON format. " | |
| "The output should strictly follow this example format without extra text:\n\n" | |
| "{\"question\": \"What is the capital of France?\", \"answer\": \"Paris\"}") | |
| ) | |
| def extract_json(text): | |
| import re | |
| match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) | |
| if match: | |
| return match.group(1) | |
| return text | |
| try: | |
| json_text = extract_json(raw) | |
| obj = json.loads(json_text) | |
| if "question" in obj and "answer" in obj: | |
| trivia_store[chat] = obj | |
| send_message(mid, chat, f"β {obj['question']}\nReply with `/answer` followed by your answer (if you want to check it) or just `/answer` to reveal the correct answer.") | |
| else: | |
| raise ValueError("Missing expected keys.") | |
| except Exception as e: | |
| logging.error("Trivia JSON parse error: %s, raw response: %s", e, raw) | |
| send_message(mid, chat, "Failed to generate trivia. Please try again.") | |
| return {"success": True} | |
| if low.startswith("/answer"): | |
| user_response = body[len("/answer"):].strip() | |
| if chat in trivia_store: | |
| correct_answer = trivia_store[chat]["answer"] | |
| question = trivia_store[chat]["question"] | |
| if user_response: | |
| eval_prompt = ( | |
| f"Question: {question}\n" | |
| f"Correct Answer: {correct_answer}\n" | |
| f"User Answer: {user_response}\n" | |
| "Is the user's answer correct? Respond with 'Correct' if yes, or 'Incorrect' if not, and explain briefly." | |
| ) | |
| verdict = generate_llm(eval_prompt) | |
| send_message(mid, chat, f"π‘ {verdict}") | |
| else: | |
| send_message(mid, chat, f"π‘ Answer: {correct_answer}") | |
| trivia_store.pop(chat, None) | |
| else: | |
| send_message(mid, chat, "No active trivia. Send `/trivia` to start one.") | |
| return {"success": True} | |
| if low.startswith("/meme "): | |
| txt = body[len("/meme "):].strip() | |
| send_message(mid, chat, "π¨ Generating your meme...") | |
| task_queue.put({ | |
| "type": "image", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": f"meme template with text: {txt}" | |
| }) | |
| return {"success": True} | |
| if low.startswith("/poll "): | |
| parts = body[len("/poll "):].split("|") | |
| if len(parts) < 3: | |
| send_message(mid, chat, "Please use `/poll Question|Option1|Option2|...`") | |
| else: | |
| q = parts[0].strip() | |
| opts = [p.strip() for p in parts[1:]] | |
| votes = {i+1: 0 for i in range(len(opts))} | |
| polls[chat] = {"question": q, "options": opts, "votes": votes, "voters": {}} | |
| txt = f"π *Poll:* {q}\n" + "\n".join( | |
| f"{i+1}. {opt}" for i, opt in enumerate(opts) | |
| ) + "\n\nReply with the *option number* to vote." | |
| send_message(mid, chat, txt) | |
| return {"success": True} | |
| if chat in polls and body.isdigit(): | |
| n = int(body) | |
| p = polls[chat] | |
| if 1 <= n <= len(p["options"]): | |
| prev = p["voters"].get(sender_jid) | |
| if prev: | |
| p["votes"][prev] -= 1 | |
| p["votes"][n] += 1 | |
| p["voters"][sender_jid] = n | |
| send_message(mid, chat, f"β Vote recorded: {p['options'][n-1]}") | |
| return {"success": True} | |
| if low == "/results": | |
| if chat in polls: | |
| p = polls[chat] | |
| txt = f"π *Results:* {p['question']}\n" + "\n".join( | |
| f"{i}. {opt}: {p['votes'][i]}" for i, opt in enumerate([""] + p["options"]) if i > 0 | |
| ) | |
| send_message(mid, chat, txt) | |
| else: | |
| send_message(mid, chat, "No active poll.") | |
| return {"success": True} | |
| if low == "/endpoll": | |
| if chat in polls: | |
| p = polls.pop(chat) | |
| txt = f"π *Final Results:* {p['question']}\n" + "\n".join( | |
| f"{i}. {opt}: {p['votes'][i]}" for i, opt in enumerate([""] + p["options"]) if i > 0 | |
| ) | |
| send_message(mid, chat, txt) | |
| else: | |
| send_message(mid, chat, "No active poll.") | |
| return {"success": True} | |
| if low.startswith("/gen"): | |
| prompt = body[len("/gen"):].strip() | |
| if not prompt: | |
| send_message(mid, chat, "Please use `/gen <prompt>` to generate an image.") | |
| else: | |
| send_message(mid, chat, "β¨ Your image is being generated. Please wait...") | |
| task_queue.put({ | |
| "type": "image", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": prompt | |
| }) | |
| return {"success": True} | |
| task_queue.put({ | |
| "type": "audio", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": body | |
| }) | |
| return {"success": True} | |
| def index(): | |
| return "Server is running!" | |
| if __name__ == "__main__": | |
| send_startup_message() | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |