Spaces:
Running
Running
Chandima Prabhath
Update weather API endpoint and enhance image generation message for clarity
07534df
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import re | |
| import json | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import PlainTextResponse, JSONResponse | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from llm import generate_llm | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") | |
| # Env vars | |
| GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
| GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| image_dir = "/tmp/images" | |
| audio_dir = "/tmp/audio" | |
| if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
| raise ValueError("Environment variables are not set properly") | |
| # Queues & inβmemory stores | |
| task_queue = queue.Queue() | |
| trivia_store = {} # chat_id β {"question":β¦, "answer":β¦} | |
| polls = {} # chat_id β {"question":β¦, "options":[β¦], "votes":{1:0β¦}, "voters":{jid:opt}} | |
| app = FastAPI() | |
| # Background worker | |
| def worker(): | |
| while True: | |
| task = task_queue.get() | |
| try: | |
| typ = task["type"] | |
| mid = task["message_id"] | |
| cid = task["chat_id"] | |
| if typ == "image": | |
| handle_image_generation(mid, cid, task["prompt"]) | |
| elif typ == "audio": | |
| response_audio(mid, cid, task["prompt"]) | |
| except Exception as e: | |
| logging.error(f"Error processing {task}: {e}") | |
| finally: | |
| task_queue.task_done() | |
| threading.Thread(target=worker, daemon=True).start() | |
| # --- send helpers --- | |
| def send_message(message_id, to_number, message, retries=3): | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "message": message, "quotedMessageId": message_id} | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, json=payload) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries-1: | |
| return {"error": str(e)} | |
| def send_image(message_id, to_number, image_path, caption = "Here you go!", retries=3): | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "caption": caption, "quotedMessageId": message_id} | |
| files = [("file", ("image.jpg", open(image_path, "rb"), "image/jpeg"))] | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, data=payload, files=files) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries-1: | |
| return {"error": str(e)} | |
| def send_audio(message_id, to_number, audio_path, retries=3): | |
| logging.debug("send_audio") | |
| chat_id = to_number if to_number.endswith("@g.us") else to_number | |
| if not os.path.exists(audio_path): | |
| logging.debug(f"Missing audio: {audio_path}") | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {"chatId": chat_id, "caption": "Here is your voice reply!", "quotedMessageId": message_id} | |
| try: | |
| with open(audio_path, "rb") as f: | |
| files = [("file", ("audio.mp3", f, "audio/mpeg"))] | |
| for i in range(retries): | |
| try: | |
| r = requests.post(url, data=payload, files=files) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as e: | |
| if i == retries-1: | |
| return {"error": str(e)} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # --- core response fns --- | |
| def response_text(message_id, chat_id, prompt): | |
| try: | |
| msg = generate_llm(prompt) | |
| send_message(message_id, chat_id, msg) | |
| except: | |
| send_message(message_id, chat_id, "Error processing your request.") | |
| def response_audio(message_id, chat_id, prompt): | |
| logging.debug("response_audio prompt=%s", prompt) | |
| try: | |
| result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
| if result and result[0]: | |
| audio_path, _ = result | |
| send_audio(message_id, chat_id, audio_path) | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| else: | |
| response_text(message_id, chat_id, prompt) | |
| except Exception as e: | |
| logging.debug("audio error: %s", e) | |
| send_message(message_id, chat_id, "Error generating audio. Try again later.") | |
| def handle_image_generation(message_id, chat_id, prompt): | |
| try: | |
| img, path, ret_prompt, url = generate_image(prompt, message_id, message_id, image_dir) | |
| if img: | |
| # Split the ret_prompt into paragraphs and wrap each in underscores for italics. | |
| formatted_ret_prompt = "\n\n".join( | |
| f"_{paragraph.strip()}_" for paragraph in ret_prompt.split("\n\n") if paragraph.strip() | |
| ) | |
| send_image( | |
| message_id, | |
| chat_id, | |
| path, | |
| caption=f"β¨ Image ready: {url}\n>{chr(8203)} {formatted_ret_prompt}" | |
| ) | |
| else: | |
| send_message(message_id, chat_id, "Image generation failed.") | |
| except Exception as e: | |
| logging.error("Error in handle_image_generation: %s", e) | |
| send_message(message_id, chat_id, "Error generating image.") | |
| # --- webhook --- | |
| async def whatsapp_webhook(request: Request): | |
| # auth & parse | |
| auth = request.headers.get("Authorization", "").strip() | |
| if auth != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
| raise HTTPException(403, "Unauthorized") | |
| try: | |
| data = await request.json() | |
| except: | |
| return JSONResponse({"error": "Invalid JSON"}, status_code=400) | |
| if data.get("typeWebhook") != "incomingMessageReceived": | |
| return {"success": True} | |
| logging.debug("recv: %s", data) | |
| sd = data["senderData"] | |
| chat = sd["chatId"] | |
| mid = data["idMessage"] | |
| sender_jid = sd.get("sender") | |
| md = data.get("messageData", {}) | |
| # drop any WhatsApp native quotedβmessage event | |
| if md.get("typeMessage") == "quotedMessage" or "quotedMessage" in md: | |
| logging.debug("skip native quotedMessage") | |
| return {"success": True} | |
| # extract text + contextInfo | |
| if "textMessageData" in md: | |
| body = md["textMessageData"].get("textMessage","").strip() | |
| ctx = md["textMessageData"].get("contextInfo",{}) | |
| elif "extendedTextMessageData" in md: | |
| body = md["extendedTextMessageData"].get("text","").strip() | |
| ctx = md["extendedTextMessageData"].get("contextInfo",{}) | |
| else: | |
| return {"success": True} | |
| # ignore native mentions & plain @123 | |
| if ctx.get("mentionedJid") or ctx.get("mentionedJidList"): | |
| return {"success": True} | |
| if chat.endswith("@g.us") and re.search(r"@\d+", body): | |
| return {"success": True} | |
| # βββ NEW COMMANDS βββ | |
| low = body.lower() | |
| # HELP | |
| if low == "/help": | |
| help_text = ( | |
| "π€ *Commands*: \n" | |
| "/help\n" | |
| "/summarize <text>\n" | |
| "/translate <lang>|<text>\n" | |
| "/joke\n" | |
| "/weather <location>\n" | |
| "/inspire\n" | |
| "/trivia β new trivia\n" | |
| "/answer β reveal answer\n" | |
| "/meme <text>\n" | |
| "/poll <Q>|<opt1>|<opt2>|β¦\n" | |
| "/results\n" | |
| "/endpoll\n" | |
| "/imagine <prompt>\n" | |
| "Or just send any text and Iβll reply by voice!" | |
| ) | |
| send_message(mid, chat, help_text) | |
| return {"success": True} | |
| # SUMMARIZE | |
| if low.startswith("/summarize "): | |
| txt = body[len("/summarize "):].strip() | |
| summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{txt}") | |
| send_message(mid, chat, summary) | |
| return {"success": True} | |
| # TRANSLATE | |
| if low.startswith("/translate "): | |
| part = body[len("/translate "):] | |
| if "|" not in part: | |
| send_message(mid, chat, "Use `/translate Language|Text`") | |
| else: | |
| lang, txt = part.split("|",1) | |
| resp = generate_llm(f"Translate the following into {lang.strip()}:\n\n{txt.strip()}") | |
| send_message(mid, chat, resp) | |
| return {"success": True} | |
| # JOKE | |
| if low == "/joke": | |
| try: | |
| joke = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| send_message(mid, chat, f"{joke['setup']}\n\n{joke['punchline']}") | |
| except: | |
| send_message(mid, chat, generate_llm("Tell me a short, funny joke.")) | |
| return {"success": True} | |
| # WEATHER | |
| if low.startswith("/weather "): | |
| loc = body[len("/weather "):].strip().replace(" ", "+") | |
| try: | |
| w = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
| send_message(mid, chat, w) | |
| except: | |
| send_message(mid, chat, "Could not fetch weather.") | |
| return {"success": True} | |
| # INSPIRE | |
| if low == "/inspire": | |
| quote = generate_llm("Give me a short inspirational quote.") | |
| send_message(mid, chat, f"β¨ {quote}") | |
| return {"success": True} | |
| # TRIVIA | |
| if low == "/trivia": | |
| raw = generate_llm( | |
| "Generate a trivia question and answer in JSON: " | |
| "{\"question\":\"...\",\"answer\":\"...\"}" | |
| ) | |
| try: | |
| obj = json.loads(raw) | |
| trivia_store[chat] = obj | |
| send_message(mid, chat, f"β {obj['question']}\nReply `/answer` to see the answer.") | |
| except: | |
| send_message(mid, chat, "Failed to generate trivia.") | |
| return {"success": True} | |
| # ANSWER | |
| if low == "/answer": | |
| if chat in trivia_store: | |
| ans = trivia_store.pop(chat)["answer"] | |
| send_message(mid, chat, f"π‘ Answer: {ans}") | |
| else: | |
| send_message(mid, chat, "No active trivia. Send `/trivia`.") | |
| return {"success": True} | |
| # MEME | |
| if low.startswith("/meme "): | |
| txt = body[len("/meme "):].strip() | |
| send_message(mid, chat, "π¨ Generating your meme...") | |
| task_queue.put({ | |
| "type": "image", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": f"meme template with text: {txt}" | |
| }) | |
| return {"success": True} | |
| # POLL | |
| if low.startswith("/poll "): | |
| parts = body[len("/poll "):].split("|") | |
| if len(parts) < 3: | |
| send_message(mid, chat, "Use `/poll Question|Option1|Option2[...]`") | |
| else: | |
| q = parts[0].strip() | |
| opts = [p.strip() for p in parts[1:]] | |
| votes = {i+1: 0 for i in range(len(opts))} | |
| polls[chat] = {"question": q, "options": opts, "votes": votes, "voters": {}} | |
| txt = f"π *Poll:* {q}\n" + "\n".join( | |
| f"{i+1}. {opt}" for i,opt in enumerate(opts) | |
| ) + "\n\nReply with the *option number* to vote." | |
| send_message(mid, chat, txt) | |
| return {"success": True} | |
| # VOTE in poll | |
| if chat in polls and body.isdigit(): | |
| n = int(body) | |
| p = polls[chat] | |
| if 1 <= n <= len(p["options"]): | |
| prev = p["voters"].get(sender_jid) | |
| if prev: | |
| p["votes"][prev] -= 1 | |
| p["votes"][n] += 1 | |
| p["voters"][sender_jid] = n | |
| send_message(mid, chat, f"β Vote recorded: {p['options'][n-1]}") | |
| return {"success": True} | |
| # POLL RESULTS | |
| if low == "/results": | |
| if chat in polls: | |
| p = polls[chat] | |
| txt = f"π *Results:* {p['question']}\n" + "\n".join( | |
| f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0 | |
| ) | |
| send_message(mid, chat, txt) | |
| else: | |
| send_message(mid, chat, "No active poll.") | |
| return {"success": True} | |
| # END POLL | |
| if low == "/endpoll": | |
| if chat in polls: | |
| p = polls.pop(chat) | |
| txt = f"π *Final Results:* {p['question']}\n" + "\n".join( | |
| f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0 | |
| ) | |
| send_message(mid, chat, txt) | |
| else: | |
| send_message(mid, chat, "No active poll.") | |
| return {"success": True} | |
| # IMAGINE (existing) | |
| if low.startswith("/imagine"): | |
| prompt = body[len("/imagine"):].strip() | |
| if not prompt: | |
| send_message(mid, chat, "Use `/imagine <prompt>`") | |
| else: | |
| send_message(mid, chat, "β¨ Your image is being generated. \nPlease wait...") | |
| task_queue.put({ | |
| "type": "image", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": prompt | |
| }) | |
| return {"success": True} | |
| # fallback β voice reply | |
| task_queue.put({ | |
| "type": "audio", | |
| "message_id": mid, | |
| "chat_id": chat, | |
| "prompt": body | |
| }) | |
| return {"success": True} | |
| def index(): | |
| return "Server is running!" | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |