Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| import json | |
| import re | |
| from typing import List, Tuple, Optional | |
| from difflib import SequenceMatcher | |
| import string | |
| class AIChatbot: | |
| def __init__(self, database_url: str = "https://database-dhe2.onrender.com"): | |
| self.database_url = database_url | |
| self.conversation_history = [] | |
| # Simple conversation patterns | |
| self.greeting_patterns = [ | |
| r'\b(hi|hello|hey|good morning|good afternoon|good evening)\b', | |
| r'\b(how are you|how\'s it going|what\'s up)\b' | |
| ] | |
| self.help_patterns = [ | |
| r'\b(help|assist|support|guide)\b', | |
| r'\b(what can you do|what do you do|your capabilities)\b' | |
| ] | |
| self.thanks_patterns = [ | |
| r'\b(thank you|thanks|appreciate|grateful)\b' | |
| ] | |
| self.goodbye_patterns = [ | |
| r'\b(bye|goodbye|see you|farewell|exit|quit)\b' | |
| ] | |
| def is_greeting(self, message: str) -> bool: | |
| """Check if the message is a greeting""" | |
| message_lower = message.lower() | |
| for pattern in self.greeting_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_help_request(self, message: str) -> bool: | |
| """Check if the message is asking for help""" | |
| message_lower = message.lower() | |
| for pattern in self.help_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_thanks(self, message: str) -> bool: | |
| """Check if the message is expressing thanks""" | |
| message_lower = message.lower() | |
| for pattern in self.thanks_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_goodbye(self, message: str) -> bool: | |
| """Check if the message is a goodbye""" | |
| message_lower = message.lower() | |
| for pattern in self.goodbye_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def get_greeting_response(self) -> str: | |
| """Generate a greeting response""" | |
| responses = [ | |
| "Hello! I'm your AI assistant. How can I help you today?", | |
| "Hi there! I'm here to assist you with any questions you might have.", | |
| "Hello! Welcome! I can help you with general conversation or answer specific questions from our database.", | |
| "Hey! Nice to meet you! What can I do for you today?" | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def get_help_response(self) -> str: | |
| """Generate a help response""" | |
| return """I'm an AI chatbot that can help you in two ways: | |
| 1. **General Conversation**: I can chat with you about various topics, answer greetings, and have friendly conversations. | |
| 2. **Specific Questions**: I can search our database for specific information and provide detailed answers to your questions. | |
| **Smart Learning**: If I can't find an answer to your question, I'll automatically save it for review so we can improve our knowledge base and provide better answers in the future. | |
| Just type your question or start a conversation, and I'll do my best to help you!""" | |
| def get_thanks_response(self) -> str: | |
| """Generate a thanks response""" | |
| responses = [ | |
| "You're welcome! I'm happy to help.", | |
| "My pleasure! Feel free to ask if you need anything else.", | |
| "Glad I could assist you! Is there anything else you'd like to know?", | |
| "You're very welcome! I'm here whenever you need help." | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def get_goodbye_response(self) -> str: | |
| """Generate a goodbye response""" | |
| responses = [ | |
| "Goodbye! Have a great day!", | |
| "See you later! Take care!", | |
| "Farewell! Feel free to come back anytime.", | |
| "Bye! I enjoyed chatting with you!" | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def save_unanswered_question(self, question: str) -> bool: | |
| """Save unanswered question to the database - matches your exact API""" | |
| print(f"Attempting to save unanswered question: '{question}'") | |
| try: | |
| # Use only the correct endpoint that matches your server | |
| endpoint = f"{self.database_url}/unanswered_questions" | |
| print(f"Using endpoint: {endpoint}") | |
| # Send POST request with only question (matching your server code) | |
| post_data = { | |
| "question": question | |
| } | |
| print(f"POST data: {post_data}") | |
| response = requests.post( | |
| endpoint, | |
| json=post_data, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| print(f"POST response status: {response.status_code}") | |
| print(f"POST response text: {response.text}") | |
| if response.status_code == 200: | |
| print(f"Successfully saved question to {endpoint}") | |
| return True | |
| else: | |
| print(f"Failed to save question. Status: {response.status_code}, Response: {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return False | |
| except Exception as e: | |
| print(f"Unexpected error saving unanswered question: {e}") | |
| return False | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp in ISO format""" | |
| from datetime import datetime | |
| return datetime.now().isoformat() | |
| def _normalize_text(self, text: str) -> str: | |
| """Normalize text for better matching""" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove punctuation | |
| text = text.translate(str.maketrans('', '', string.punctuation)) | |
| # Remove extra whitespace | |
| text = ' '.join(text.split()) | |
| # Additional normalization for better matching | |
| # Replace common variations | |
| replacements = { | |
| 'what are the': 'what', | |
| 'what is the': 'what', | |
| 'what are': 'what', | |
| 'what is': 'what', | |
| 'how do i': 'how', | |
| 'how can i': 'how', | |
| 'how to': 'how', | |
| 'when is the': 'when', | |
| 'when are the': 'when', | |
| 'where is the': 'where', | |
| 'where are the': 'where', | |
| 'who is the': 'who', | |
| 'who are the': 'who' | |
| } | |
| for old, new in replacements.items(): | |
| if text.startswith(old): | |
| text = text.replace(old, new, 1) | |
| break | |
| return text | |
| def _extract_keywords(self, text: str) -> List[str]: | |
| """Extract important keywords from text with enhanced processing""" | |
| # Extended stop words to ignore | |
| stop_words = { | |
| 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'what', 'how', 'when', 'where', 'why', | |
| 'who', 'which', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they', | |
| 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'her', 'its', 'our', 'their', | |
| 'there', 'here', 'some', 'any', 'all', 'each', 'every', 'much', 'many', 'more', 'most', | |
| 'very', 'just', 'only', 'also', 'even', 'still', 'yet', 'so', 'too', 'well', 'now', 'then' | |
| } | |
| # Normalize and split into words | |
| words = self._normalize_text(text).split() | |
| # Enhanced keyword extraction | |
| keywords = [] | |
| for word in words: | |
| # Filter out stop words and very short words | |
| if word not in stop_words and len(word) > 2: | |
| # Add the word | |
| keywords.append(word) | |
| # Add common variations and stems | |
| if word.endswith('s') and len(word) > 3: | |
| keywords.append(word[:-1]) # Remove 's' for plurals | |
| if word.endswith('ing') and len(word) > 4: | |
| keywords.append(word[:-3]) # Remove 'ing' | |
| if word.endswith('ed') and len(word) > 3: | |
| keywords.append(word[:-2]) # Remove 'ed' | |
| return list(set(keywords)) # Remove duplicates | |
| def _calculate_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate similarity between two texts using advanced methods""" | |
| norm1 = self._normalize_text(text1) | |
| norm2 = self._normalize_text(text2) | |
| # Method 1: Sequence matcher on normalized text | |
| sequence_similarity = SequenceMatcher(None, norm1, norm2).ratio() | |
| # Method 2: Enhanced keyword overlap with stemming | |
| keywords1 = set(self._extract_keywords(text1)) | |
| keywords2 = set(self._extract_keywords(text2)) | |
| keyword_similarity = 0.0 | |
| if keywords1 and keywords2: | |
| intersection = keywords1.intersection(keywords2) | |
| union = keywords1.union(keywords2) | |
| keyword_similarity = len(intersection) / len(union) if union else 0.0 | |
| # Method 3: Substring containment (both directions) | |
| contains_similarity = 0.0 | |
| if norm1 in norm2: | |
| contains_similarity = max(contains_similarity, 0.9 * (len(norm1) / len(norm2))) | |
| if norm2 in norm1: | |
| contains_similarity = max(contains_similarity, 0.9 * (len(norm2) / len(norm1))) | |
| # Method 4: Enhanced word order similarity | |
| words1 = norm1.split() | |
| words2 = norm2.split() | |
| word_order_similarity = 0.0 | |
| if words1 and words2: | |
| # Check for common word sequences (exact order) | |
| common_sequences = 0 | |
| max_len = min(len(words1), len(words2)) | |
| for i in range(max_len): | |
| if words1[i] == words2[i]: | |
| common_sequences += 1 | |
| exact_order_similarity = common_sequences / max_len if max_len > 0 else 0.0 | |
| # Check for word order flexibility (any order) | |
| set1 = set(words1) | |
| set2 = set(words2) | |
| common_words = set1.intersection(set2) | |
| total_words = set1.union(set2) | |
| flexible_order_similarity = len(common_words) / len(total_words) if total_words else 0.0 | |
| # Check for phrase patterns (like "available courses" vs "courses available") | |
| phrase_similarity = self._calculate_phrase_order_similarity(words1, words2) | |
| # Combine different word order methods | |
| word_order_similarity = ( | |
| exact_order_similarity * 0.3 + | |
| flexible_order_similarity * 0.5 + | |
| phrase_similarity * 0.2 | |
| ) | |
| # Method 5: Semantic similarity using word relationships | |
| semantic_similarity = self._calculate_semantic_similarity(keywords1, keywords2) | |
| # Method 6: Length similarity (shorter queries should match longer answers) | |
| length_similarity = 0.0 | |
| if len(norm1) > 0 and len(norm2) > 0: | |
| length_ratio = min(len(norm1), len(norm2)) / max(len(norm1), len(norm2)) | |
| length_similarity = length_ratio * 0.3 # Lower weight for length | |
| # Method 7: Phrase matching (for common phrases) | |
| phrase_similarity = self._calculate_phrase_similarity(norm1, norm2) | |
| # Combine all methods with optimized weights | |
| final_similarity = ( | |
| sequence_similarity * 0.25 + | |
| keyword_similarity * 0.30 + | |
| contains_similarity * 0.20 + | |
| word_order_similarity * 0.10 + | |
| semantic_similarity * 0.10 + | |
| length_similarity * 0.03 + | |
| phrase_similarity * 0.02 | |
| ) | |
| return min(final_similarity, 1.0) # Cap at 1.0 | |
| def _calculate_semantic_similarity(self, keywords1: set, keywords2: set) -> float: | |
| """Calculate semantic similarity using word relationships""" | |
| if not keywords1 or not keywords2: | |
| return 0.0 | |
| # Common semantic relationships | |
| semantic_groups = { | |
| 'money': {'cost', 'price', 'tuition', 'fee', 'payment', 'money', 'financial', 'aid', 'scholarship'}, | |
| 'time': {'deadline', 'when', 'time', 'date', 'schedule', 'duration', 'period'}, | |
| 'contact': {'contact', 'phone', 'email', 'address', 'office', 'reach', 'call'}, | |
| 'requirements': {'requirement', 'need', 'required', 'must', 'prerequisite', 'condition'}, | |
| 'application': {'apply', 'application', 'submit', 'process', 'procedure'}, | |
| 'programs': {'program', 'course', 'major', 'degree', 'study', 'academic', 'available', 'offered', 'listings'}, | |
| 'admission': {'admission', 'admit', 'accept', 'enroll', 'entry', 'enter'}, | |
| 'courses': {'course', 'courses', 'program', 'programs', 'major', 'majors', 'degree', 'degrees', 'available', 'offered', 'listings', 'what', 'which'} | |
| } | |
| # Check if keywords belong to the same semantic group | |
| semantic_score = 0.0 | |
| for group, words in semantic_groups.items(): | |
| group1_match = any(keyword in words for keyword in keywords1) | |
| group2_match = any(keyword in words for keyword in keywords2) | |
| if group1_match and group2_match: | |
| semantic_score += 0.3 | |
| return min(semantic_score, 1.0) | |
| def _calculate_phrase_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate similarity based on common phrases""" | |
| # Common phrases that should match | |
| common_phrases = [ | |
| ('admission requirements', 'requirements admission'), | |
| ('financial aid', 'aid financial'), | |
| ('tuition cost', 'cost tuition'), | |
| ('application deadline', 'deadline application'), | |
| ('contact admissions', 'admissions contact'), | |
| ('gpa requirement', 'requirement gpa'), | |
| ('academic requirements', 'requirements academic') | |
| ] | |
| phrase_score = 0.0 | |
| for phrase1, phrase2 in common_phrases: | |
| if (phrase1 in text1 and phrase1 in text2) or (phrase2 in text1 and phrase2 in text2): | |
| phrase_score += 0.5 | |
| elif (phrase1 in text1 and phrase2 in text2) or (phrase2 in text1 and phrase1 in text2): | |
| phrase_score += 0.4 | |
| return min(phrase_score, 1.0) | |
| def _calculate_phrase_order_similarity(self, words1: List[str], words2: List[str]) -> float: | |
| """Calculate similarity based on phrase order flexibility""" | |
| if not words1 or not words2: | |
| return 0.0 | |
| # Common phrase patterns that should match regardless of order | |
| phrase_patterns = [ | |
| (['available', 'courses'], ['courses', 'available']), | |
| (['admission', 'requirements'], ['requirements', 'admission']), | |
| (['financial', 'aid'], ['aid', 'financial']), | |
| (['tuition', 'cost'], ['cost', 'tuition']), | |
| (['application', 'deadline'], ['deadline', 'application']), | |
| (['contact', 'admissions'], ['admissions', 'contact']), | |
| (['gpa', 'requirement'], ['requirement', 'gpa']), | |
| (['academic', 'requirements'], ['requirements', 'academic']), | |
| (['programs', 'available'], ['available', 'programs']), | |
| (['what', 'programs'], ['programs', 'what']), | |
| (['what', 'courses'], ['courses', 'what']), | |
| (['what', 'available'], ['available', 'what']) | |
| ] | |
| # Check for phrase pattern matches | |
| for pattern1, pattern2 in phrase_patterns: | |
| # Check if words1 contains pattern1 and words2 contains pattern2 | |
| if (all(word in words1 for word in pattern1) and | |
| all(word in words2 for word in pattern2)): | |
| return 0.8 | |
| # Check if words1 contains pattern2 and words2 contains pattern1 | |
| if (all(word in words1 for word in pattern2) and | |
| all(word in words2 for word in pattern1)): | |
| return 0.8 | |
| # Check for partial phrase matches | |
| for pattern1, pattern2 in phrase_patterns: | |
| # Check if at least 2 words from each pattern are present | |
| words1_matches = sum(1 for word in pattern1 if word in words1) | |
| words2_matches = sum(1 for word in pattern2 if word in words2) | |
| if words1_matches >= 2 and words2_matches >= 2: | |
| return 0.6 | |
| return 0.0 | |
| def _find_best_match(self, user_question: str, database_questions: List[str], threshold: float = 0.25) -> Optional[str]: | |
| """Find the best matching question from database with improved logic""" | |
| if not database_questions: | |
| return None | |
| best_match = None | |
| best_score = 0.0 | |
| all_scores = [] | |
| # Calculate similarity for all questions | |
| for db_question in database_questions: | |
| similarity = self._calculate_similarity(user_question, db_question) | |
| all_scores.append((db_question, similarity)) | |
| if similarity > best_score: | |
| best_score = similarity | |
| best_match = db_question | |
| # Sort by similarity score | |
| all_scores.sort(key=lambda x: x[1], reverse=True) | |
| # If the best score is above threshold, return it | |
| if best_score >= threshold: | |
| return best_match | |
| # If no single match is above threshold, try adaptive threshold | |
| if all_scores: | |
| # Use the top score if it's reasonably close to threshold | |
| top_score = all_scores[0][1] | |
| if top_score >= threshold * 0.8: # 80% of threshold | |
| return all_scores[0][0] | |
| # Last resort: if user question is very short, be more lenient | |
| if len(user_question.split()) <= 3 and all_scores: | |
| # For short queries, use a lower threshold | |
| if all_scores[0][1] >= 0.15: | |
| return all_scores[0][0] | |
| return None | |
| def _generate_query_variants(self, question: str) -> List[str]: | |
| """Generate lightweight query variants to improve matching against FAQs""" | |
| variants: List[str] = [] | |
| original = question.strip() | |
| variants.append(original) | |
| # Normalized | |
| norm = self._normalize_text(original) | |
| variants.append(norm) | |
| # Remove trailing punctuation and repeated spaces already handled by normalize | |
| # Simple lemmatization-ish tweaks for common cases | |
| rules = [ | |
| (r"\btakes\b", "take"), | |
| (r"\btake\b", "takes"), | |
| (r"\bdoes\b", "do"), | |
| (r"\bdo\b", "does"), | |
| (r"\bis\b", "are"), | |
| (r"\bare\b", "is"), | |
| ] | |
| for pattern, repl in rules: | |
| try: | |
| v = re.sub(pattern, repl, norm) | |
| if v not in variants: | |
| variants.append(v) | |
| except Exception: | |
| pass | |
| # Last-word singular/plural toggle | |
| words = norm.split() | |
| if words: | |
| last = words[-1] | |
| if len(last) > 3 and last.endswith('s'): | |
| alt = ' '.join(words[:-1] + [last[:-1]]) | |
| if alt not in variants: | |
| variants.append(alt) | |
| else: | |
| alt = ' '.join(words[:-1] + [last + 's']) | |
| if alt not in variants: | |
| variants.append(alt) | |
| # De-duplicate while preserving order | |
| seen = set() | |
| unique_variants: List[str] = [] | |
| for v in variants: | |
| if v not in seen and v: | |
| seen.add(v) | |
| unique_variants.append(v) | |
| return unique_variants | |
| def fetch_from_database(self, question: str) -> str: | |
| """Fetch answer from the database with smart matching""" | |
| try: | |
| # First, try to get all available questions for smart matching | |
| all_questions = self._get_all_questions() | |
| # If we have all questions, try smart matching first | |
| if all_questions: | |
| best_match = self._find_best_match(question, all_questions) | |
| if best_match: | |
| # Try to get answer for the best matching question | |
| answer = self._get_answer_for_question(best_match) | |
| if answer and not self._is_no_answer_response(answer): | |
| return answer | |
| # Fallback to original method if smart matching doesn't work | |
| endpoints = [ | |
| f"{self.database_url}/faqs", | |
| f"{self.database_url}/faq", | |
| f"{self.database_url}/search", | |
| f"{self.database_url}/query", | |
| f"{self.database_url}/api/faq" | |
| ] | |
| param_names = ["question", "q"] | |
| variants = self._generate_query_variants(question) | |
| for endpoint in endpoints: | |
| for variant in variants: | |
| for param_name in param_names: | |
| # Try GET | |
| try: | |
| response = requests.get( | |
| endpoint, | |
| params={param_name: variant}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| answer = data.get('answer', data.get('response', str(data))) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| elif isinstance(data, list) and len(data) > 0: | |
| answer = str(data[0]) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| else: | |
| answer = str(data) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| except Exception: | |
| pass | |
| # Try POST JSON | |
| try: | |
| response = requests.post( | |
| endpoint, | |
| json={param_name: variant}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| answer = data.get('answer', data.get('response', str(data))) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| elif isinstance(data, list) and len(data) > 0: | |
| answer = str(data[0]) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| else: | |
| answer = str(data) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| except Exception: | |
| pass | |
| # If no answer found, save the question as unanswered | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, I couldn't find a specific answer to your question in our database. I've saved your question for review, and we'll work on providing a better answer in the future. Could you try rephrasing your question or ask me something else?" | |
| else: | |
| return "I'm sorry, I couldn't find a specific answer to your question in our database. I tried to save your question for review, but there was an issue with our database connection. Could you try rephrasing your question or ask me something else?" | |
| except requests.exceptions.Timeout: | |
| # Save the question even if there's a timeout | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, the database is taking too long to respond. I've saved your question for review. Please try again in a moment." | |
| else: | |
| return "I'm sorry, the database is taking too long to respond. Please try again in a moment." | |
| except requests.exceptions.ConnectionError: | |
| # Save the question even if there's a connection error | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, I'm having trouble connecting to our database right now. I've saved your question for review. Please try again later." | |
| else: | |
| return "I'm sorry, I'm having trouble connecting to our database right now. Please try again later." | |
| except Exception as e: | |
| # Save the question even if there's an unexpected error | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return f"I encountered an error while searching our database: {str(e)}. I've saved your question for review. Please try again." | |
| else: | |
| return f"I encountered an error while searching our database: {str(e)}. Please try again." | |
| def _get_all_questions(self) -> List[str]: | |
| """Get all available questions from the database for smart matching""" | |
| try: | |
| # Try different endpoints to get all questions | |
| endpoints = [ | |
| f"{self.database_url}/questions", | |
| f"{self.database_url}/faq/all", | |
| f"{self.database_url}/api/questions", | |
| f"{self.database_url}/all_questions" | |
| ] | |
| for endpoint in endpoints: | |
| try: | |
| response = requests.get(endpoint, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, list): | |
| return [str(item) for item in data] | |
| elif isinstance(data, dict) and 'questions' in data: | |
| return [str(q) for q in data['questions']] | |
| except: | |
| continue | |
| return [] | |
| except: | |
| return [] | |
| def _get_answer_for_question(self, question: str) -> Optional[str]: | |
| """Get answer for a specific question""" | |
| try: | |
| endpoints = [ | |
| f"{self.database_url}/faqs", | |
| f"{self.database_url}/faq", | |
| f"{self.database_url}/search", | |
| f"{self.database_url}/query", | |
| f"{self.database_url}/api/faq" | |
| ] | |
| param_names = ["question", "q"] | |
| variants = self._generate_query_variants(question) | |
| for endpoint in endpoints: | |
| for variant in variants: | |
| for param_name in param_names: | |
| try: | |
| response = requests.get( | |
| endpoint, | |
| params={param_name: variant}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| return data.get('answer', data.get('response', str(data))) | |
| elif isinstance(data, list) and len(data) > 0: | |
| return str(data[0]) | |
| else: | |
| return str(data) | |
| except Exception: | |
| pass | |
| try: | |
| response = requests.post( | |
| endpoint, | |
| json={param_name: variant}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| return data.get('answer', data.get('response', str(data))) | |
| elif isinstance(data, list) and len(data) > 0: | |
| return str(data[0]) | |
| else: | |
| return str(data) | |
| except Exception: | |
| pass | |
| return None | |
| except: | |
| return None | |
| def _is_no_answer_response(self, answer: str) -> bool: | |
| """Check if the response indicates no answer was found""" | |
| no_answer_indicators = [ | |
| "no answer", | |
| "not found", | |
| "no results", | |
| "no data", | |
| "empty", | |
| "null", | |
| "none", | |
| "i don't know", | |
| "i don't have", | |
| "cannot find", | |
| "unable to find" | |
| ] | |
| answer_lower = answer.lower().strip() | |
| return any(indicator in answer_lower for indicator in no_answer_indicators) | |
| def chat(self, message: str, history: List[List[str]]) -> str: | |
| """Main chat function""" | |
| if not message.strip(): | |
| return "Please enter a message so I can help you!" | |
| # Store conversation history | |
| self.conversation_history.append(("user", message)) | |
| # Check for conversation patterns | |
| if self.is_greeting(message): | |
| response = self.get_greeting_response() | |
| elif self.is_help_request(message): | |
| response = self.get_help_response() | |
| elif self.is_thanks(message): | |
| response = self.get_thanks_response() | |
| elif self.is_goodbye(message): | |
| response = self.get_goodbye_response() | |
| else: | |
| # Try to fetch from database | |
| response = self.fetch_from_database(message) | |
| # Store bot response | |
| self.conversation_history.append(("bot", response)) | |
| return response | |
| # Initialize the chatbot | |
| chatbot = AIChatbot() | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="AI Chatbot" | |
| ) as interface: | |
| gr.Markdown( | |
| """ | |
| # 🤖 AI Chatbot Assistant | |
| Welcome! I'm your AI assistant that can help you with: | |
| - **General conversation** and friendly chat | |
| - **Specific questions** answered from our knowledge database | |
| Just type your message below and I'll do my best to help you! | |
| """ | |
| ) | |
| # Chat interface | |
| chatbot_interface = gr.ChatInterface( | |
| fn=chatbot.chat, | |
| title="Chat with AI Assistant", | |
| description="Ask me anything or just have a conversation!", | |
| examples=[ | |
| "Hello!", | |
| "What can you help me with?", | |
| "How do I contact support?", | |
| "What are your services?", | |
| "Thank you for your help!" | |
| ], | |
| cache_examples=False | |
| ) | |
| # Footer | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Note**: This chatbot can handle general conversation and search our database for specific information. | |
| If you don't get the answer you're looking for, try rephrasing your question! | |
| """ | |
| ) | |
| return interface | |
| # Launch the application | |
| if __name__ == "__main__": | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True | |
| ) | |