Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| import json | |
| import re | |
| from typing import List, Tuple, Optional | |
| from difflib import SequenceMatcher | |
| import string | |
| class AIChatbot: | |
| def __init__(self, database_url: str = "https://database-dhe2.onrender.com"): | |
| self.database_url = database_url | |
| self.conversation_history = [] | |
| # Profanity filter - list of bad words to filter (English and Tagalog) | |
| self.bad_words = { | |
| # English bad words | |
| 'damn', 'hell', 'crap', 'suck', 'idiot', 'fool', 'jerk', 'loser', 'dumb', 'trash', | |
| 'butt', 'freak', 'nut', 'moron', 'dummy', 'bozo', 'twit', 'dope', 'dumbass', | |
| 'poophead', 'jerkoff', 'bugger', 'wanker', 'tosser', 'bastard', 'scum', 'slime', | |
| 'creep', 'brat', 'dweeb', 'goon', 'booby', 'puke', 'vomit', 'dung', 'sap', | |
| 'clutz', 'knob', 'prick', 'ass', 'shit', 'fuck', 'cock', 'tits', 'pussy', | |
| 'cunt', 'slut', 'bitch', 'whore', 'skank', 'stupid', | |
| 'asshole', 'dick', 'douche', 'scumbag', 'slimeball', 'douchebag', 'knobhead', | |
| 'numskull', 'halfwit', 'nincompoop', 'blockhead', 'dimwit', 'nitwit', 'simpleton', | |
| 'dunce', 'buffoon', 'doofus', 'clod', 'goober', 'jerkface', 'schmuck', 'scoundrel', | |
| 'miscreant', 'rat', 'git', 'wazzock', 'pillock', 'prat', 'plonker', 'div', 'bellend', | |
| 'tosserhead', 'twitbrain', 'sapbrain', 'knucklehead', 'dopey', 'boob', 'dingbat', 'oaf', | |
| 'ninnyhammer', 'chucklehead', 'saphead', 'pukehead', 'fuckface', 'assface', 'dickhead', | |
| 'cockhead', 'shithead', 'twatface', 'doucheface', 'bastardface', 'motherfucker', 'shitbag', | |
| 'cocksucker', 'jackass', 'wankerface', 'tosserface', 'arsehole', 'shitstain', 'assholeface', | |
| 'prickface', 'dumbfuck', 'fucknut', 'twatwaffle', 'shitbagger', 'dickweed', 'cumdump', | |
| 'asswipe', 'cockwomble', 'bollocks', 'twat', 'dick', 'fucking', | |
| # Tagalog bad words | |
| 'gago', 'putangina', 'putang', 'hayop', 'lintik', 'walang', 'hiya', 'bobo', 'leche', | |
| 'punyeta', 'sira', 'ulo', 'bwisit', 'pakshet', 'tarantado', 'ulol', 'buwisit', | |
| 'hudas', 'kupal', 'shet', 'tae', 'tanga', 'tangina', 'bastos', 'maldita', 'loko', | |
| 'asar', 'pekpek', 'burat', 'kantot', 'puke', 'kantotin', 'tarantadoin', 'ulolan', | |
| 'bading', 'bakla', 'unggoy', 'asarin', 'bastusin', 'malditahin', 'buratin', 'pekpekin', | |
| 'pukein', 'tangain', 'gagoan', 'tarantadohin', 'ina' | |
| } | |
| # Bad phrases (multi-word profanity - English and Tagalog) | |
| self.bad_phrases = { | |
| # English phrases | |
| 'fuck you', 'shit you', 'damn you', 'hell you', | |
| 'you bastard', 'you bitch', 'you dick', 'you prick', 'you cunt', 'you slut', 'you whore', | |
| 'you jerk', 'you idiot', 'you fool', 'you moron', 'you dumbass', 'you douche', 'you twat', | |
| 'you bugger', 'you wanker', 'you tosser', 'you poophead', 'you scumbag', 'you slimeball', | |
| 'you douchebag', 'you knobhead', 'you bozo', 'you twit', 'you dope', 'you numskull', | |
| 'you halfwit', 'you nincompoop', 'you blockhead', 'you dimwit', 'you nitwit', 'you simpleton', | |
| 'you dunce', 'you buffoon', 'you doofus', 'you clod', 'you goober', 'you jerkface', | |
| 'you schmuck', 'you scoundrel', 'you miscreant', 'you rat', 'you puke', 'you vomit', | |
| 'you dung', 'you ass', 'you tits', 'you pussy', 'you cock', 'you fuckface', 'you assface', | |
| 'you dickhead', 'you cockhead', 'you shithead', 'you twatface', 'you knobhead', 'you doucheface', | |
| 'you loser', 'you bastardface', 'you motherfucker', 'you shitbag', 'you cocksucker', | |
| 'you jackass', 'you wankerface', 'you tosserface', 'you arsehole', 'you asshole', 'you freak', 'you nut', | |
| 'you scum', 'you creep', 'you brat', 'you dweeb', 'you goon', 'you pukehead', 'you shitstain', | |
| 'you assholeface', 'you prickface', 'you dumbfuck', 'you fucknut', 'you twatwaffle', | |
| 'you shitbagger', 'you dickweed', 'you cumdump', 'you asswipe', 'you cockwomble', | |
| 'you bollocks', 'you wazzock', 'you pillock', 'you plonker', 'you div', 'you bellend', | |
| 'you twitbrain', 'you motherfucking idiot', 'fuckig stupid', | |
| # Tagalog phrases | |
| 'walang hiya', 'sira ulo', 'walang kwenta', 'walang silbe', | |
| 'putang ina', 'putang ina ka', 'putang ina mo', | |
| 'gago ka', 'gago mo', 'gago-gago', 'gago-gago ka', 'gago-gago mo', 'gagoan ka', 'gagoan mo', | |
| 'tanga ka', 'tanga mo', 'tanga-tanga', 'tanga-tanga ka', 'tanga-tanga mo', 'tangain ka', 'tangain mo', 'tanga-in ka', 'tanga-in mo', | |
| 'bobo ka', 'bobo mo', 'bobo-bobo', 'bobo-bobo ka', 'bobo-bobo mo', 'bobo-in ka', 'bobo-in mo', | |
| 'ulol ka', 'ulol mo', 'ulol-ulol', 'ulol-ulol ka', 'ulol-ulol mo', 'ulolan ka', 'ulolan mo', 'ulol-in ka', 'ulol-in mo', | |
| 'tarantado ka', 'tarantado mo', 'tarantado-tarantado', 'tarantado-tarantado ka', 'tarantado-tarantado mo', | |
| 'tarantadoin ka', 'tarantadoin mo', 'tarantado-in ka', 'tarantado-in mo', 'tarantadohin ka', 'tarantadohin mo', | |
| 'bastos ka', 'bastos mo', 'bastusin ka', 'bastusin mo', | |
| 'maldita ka', 'maldita mo', 'malditahin ka', 'malditahin mo', | |
| 'loko ka', 'loko mo', 'loko-loko', 'loko-loko ka', 'loko-loko mo', | |
| 'asar ka', 'asar mo', 'asarin ka', 'asarin mo', | |
| 'pekpek ka', 'pekpek mo', 'pekpekin ka', 'pekpekin mo', | |
| 'burat ka', 'burat mo', 'buratin ka', 'buratin mo', | |
| 'kantot ka', 'kantot mo', 'kantotin ka', 'kantotin mo', | |
| 'puke ka', 'puke mo', 'pukein ka', 'pukein mo', | |
| 'bading ka', 'bading mo', | |
| 'bakla ka', 'bakla mo', | |
| 'unggoy ka', 'unggoy mo' | |
| } | |
| # Simple conversation patterns | |
| self.greeting_patterns = [ | |
| r'\b(hi|hello|hey|good morning|good afternoon|good evening)\b', | |
| r'\b(how are you|how\'s it going|what\'s up)\b' | |
| ] | |
| self.help_patterns = [ | |
| r'\b(help|assist|support|guide)\b', | |
| r'\b(what can you do|what do you do|your capabilities)\b' | |
| ] | |
| self.thanks_patterns = [ | |
| r'\b(thank you|thanks|appreciate|grateful)\b' | |
| ] | |
| self.goodbye_patterns = [ | |
| r'\b(bye|goodbye|see you|farewell|exit|quit)\b' | |
| ] | |
| def is_greeting(self, message: str) -> bool: | |
| """Check if the message is a greeting""" | |
| message_lower = message.lower() | |
| for pattern in self.greeting_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_help_request(self, message: str) -> bool: | |
| """Check if the message is asking for help""" | |
| message_lower = message.lower() | |
| for pattern in self.help_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_thanks(self, message: str) -> bool: | |
| """Check if the message is expressing thanks""" | |
| message_lower = message.lower() | |
| for pattern in self.thanks_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def is_goodbye(self, message: str) -> bool: | |
| """Check if the message is a goodbye""" | |
| message_lower = message.lower() | |
| for pattern in self.goodbye_patterns: | |
| if re.search(pattern, message_lower): | |
| return True | |
| return False | |
| def contains_profanity(self, message: str) -> bool: | |
| """Check if the message contains any profanity""" | |
| # Normalize message: convert to lowercase | |
| message_lower = message.lower() | |
| # First, check for bad phrases (multi-word profanity like "walang hiya", "sira ulo", "gago-gago") | |
| for phrase in self.bad_phrases: | |
| # Replace hyphens with spaces for better matching (handles "gago-gago" as "gago gago") | |
| phrase_normalized = phrase.replace('-', ' ') | |
| # Remove punctuation but keep spaces, normalize whitespace | |
| phrase_clean = re.sub(r'[^\w\s]', '', phrase_normalized) | |
| phrase_clean = re.sub(r'\s+', ' ', phrase_clean).strip() | |
| # Normalize message similarly - replace hyphens with spaces | |
| message_normalized = message_lower.replace('-', ' ') | |
| message_clean_phrase = re.sub(r'[^\w\s]', '', message_normalized) | |
| message_clean_phrase = re.sub(r'\s+', ' ', message_clean_phrase).strip() | |
| # Check if phrase appears in message (with flexible spacing) | |
| # Split phrase into words and create pattern that matches with any whitespace | |
| phrase_words = phrase_clean.split() | |
| if len(phrase_words) > 0: | |
| # Create pattern that matches words with one or more spaces between them | |
| # Using word boundaries to ensure whole words are matched | |
| phrase_pattern = r'\b' + r'\s+'.join(re.escape(word) for word in phrase_words) + r'\b' | |
| if re.search(phrase_pattern, message_clean_phrase, re.IGNORECASE): | |
| return True | |
| # Normalize common obfuscation characters | |
| # Replace common character substitutions (numbers/symbols) with letters | |
| obfuscation_map = { | |
| '0': 'o', '1': 'i', '3': 'e', '4': 'a', '5': 's', | |
| '7': 't', '@': 'a', '!': 'i', '$': 's', '&': 'a' | |
| } | |
| # Create a normalized version for checking | |
| normalized = message_lower | |
| for char, replacement in obfuscation_map.items(): | |
| normalized = normalized.replace(char, replacement) | |
| # Replace hyphens with spaces to handle hyphenated words like "gago-gago" | |
| normalized = normalized.replace('-', ' ') | |
| # Remove all non-word characters (except spaces) for word boundary checking | |
| message_clean = re.sub(r'[^\w\s]', '', normalized) | |
| # Normalize multiple spaces to single space | |
| message_clean = re.sub(r'\s+', ' ', message_clean).strip() | |
| words = message_clean.split() | |
| # Check for exact word matches in cleaned message | |
| for word in words: | |
| if word in self.bad_words: | |
| return True | |
| # Check for words that start with bad words (handles variations like "fucking" from "fuck") | |
| # Also check the original message for word boundaries | |
| for bad_word in self.bad_words: | |
| # Pattern 1: Word boundary followed by bad word (handles "fuck", "fucking", etc.) | |
| pattern1 = r'\b' + re.escape(bad_word) + r'\w*' | |
| if re.search(pattern1, normalized): | |
| return True | |
| # Pattern 2: Check in cleaned message (handles words with punctuation removed) | |
| if bad_word in message_clean: | |
| # Make sure it's a whole word, not part of another word | |
| pattern2 = r'\b' + re.escape(bad_word) + r'\b' | |
| if re.search(pattern2, message_clean): | |
| return True | |
| return False | |
| def get_profanity_warning(self) -> str: | |
| """Get a polite response when profanity is detected""" | |
| responses = [ | |
| "I understand you might be frustrated, but please keep our conversation respectful. I'm here to help you with any questions or concerns you might have.", | |
| "I appreciate your message, but let's keep our conversation friendly and professional. How can I assist you today?", | |
| "I'm here to help, but I'd prefer we keep our conversation appropriate. Is there something specific you'd like to ask me?", | |
| "Let's maintain a respectful conversation. I'm happy to help you with any questions or information you need." | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def get_greeting_response(self) -> str: | |
| """Generate a greeting response""" | |
| responses = [ | |
| "Hello! I'm your AI assistant. How can I help you today?", | |
| "Hi there! I'm here to assist you with any questions you might have.", | |
| "Hello! Welcome! I can help you with general conversation or answer specific questions from our database.", | |
| "Hey! Nice to meet you! What can I do for you today?" | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def get_help_response(self) -> str: | |
| """Generate a help response""" | |
| return """I'm an AI chatbot that can help you in two ways: | |
| 1. **General Conversation**: I can chat with you about various topics, answer greetings, and have friendly conversations. | |
| 2. **Specific Questions**: I can search our database for specific information and provide detailed answers to your questions. | |
| **Smart Learning**: If I can't find an answer to your question, I'll automatically save it for review so we can improve our knowledge base and provide better answers in the future. | |
| Just type your question or start a conversation, and I'll do my best to help you!""" | |
| def get_thanks_response(self) -> str: | |
| """Generate a thanks response""" | |
| responses = [ | |
| "You're welcome! I'm happy to help.", | |
| "My pleasure! Feel free to ask if you need anything else.", | |
| "Glad I could assist you! Is there anything else you'd like to know?", | |
| "You're very welcome! I'm here whenever you need help." | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def get_goodbye_response(self) -> str: | |
| """Generate a goodbye response""" | |
| responses = [ | |
| "Goodbye! Have a great day!", | |
| "See you later! Take care!", | |
| "Farewell! Feel free to come back anytime.", | |
| "Bye! I enjoyed chatting with you!" | |
| ] | |
| import random | |
| return random.choice(responses) | |
| def save_unanswered_question(self, question: str) -> bool: | |
| """Save unanswered question to the database - matches your exact API""" | |
| print(f"Attempting to save unanswered question: '{question}'") | |
| try: | |
| # Use only the correct endpoint that matches your server | |
| endpoint = f"{self.database_url}/unanswered_questions" | |
| print(f"Using endpoint: {endpoint}") | |
| # Send POST request with only question (matching your server code) | |
| post_data = { | |
| "question": question | |
| } | |
| print(f"POST data: {post_data}") | |
| response = requests.post( | |
| endpoint, | |
| json=post_data, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| print(f"POST response status: {response.status_code}") | |
| print(f"POST response text: {response.text}") | |
| if response.status_code == 200: | |
| print(f"Successfully saved question to {endpoint}") | |
| return True | |
| else: | |
| print(f"Failed to save question. Status: {response.status_code}, Response: {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return False | |
| except Exception as e: | |
| print(f"Unexpected error saving unanswered question: {e}") | |
| return False | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp in ISO format""" | |
| from datetime import datetime | |
| return datetime.now().isoformat() | |
| def _normalize_text(self, text: str) -> str: | |
| """Normalize text for better matching""" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove punctuation | |
| text = text.translate(str.maketrans('', '', string.punctuation)) | |
| # Remove extra whitespace | |
| text = ' '.join(text.split()) | |
| # Additional normalization for better matching | |
| # Replace common variations | |
| replacements = { | |
| 'what are the': 'what', | |
| 'what is the': 'what', | |
| 'what are': 'what', | |
| 'what is': 'what', | |
| 'how do i': 'how', | |
| 'how can i': 'how', | |
| 'how to': 'how', | |
| 'when is the': 'when', | |
| 'when are the': 'when', | |
| 'where is the': 'where', | |
| 'where are the': 'where', | |
| 'who is the': 'who', | |
| 'who are the': 'who' | |
| } | |
| for old, new in replacements.items(): | |
| if text.startswith(old): | |
| text = text.replace(old, new, 1) | |
| break | |
| return text | |
| def _extract_keywords(self, text: str) -> List[str]: | |
| """Extract important keywords from text with enhanced processing""" | |
| # Extended stop words to ignore | |
| stop_words = { | |
| 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'what', 'how', 'when', 'where', 'why', | |
| 'who', 'which', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they', | |
| 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'her', 'its', 'our', 'their', | |
| 'there', 'here', 'some', 'any', 'all', 'each', 'every', 'much', 'many', 'more', 'most', | |
| 'very', 'just', 'only', 'also', 'even', 'still', 'yet', 'so', 'too', 'well', 'now', 'then' | |
| } | |
| # Normalize and split into words | |
| words = self._normalize_text(text).split() | |
| # Enhanced keyword extraction | |
| keywords = [] | |
| for word in words: | |
| # Filter out stop words and very short words | |
| if word not in stop_words and len(word) > 2: | |
| # Add the word | |
| keywords.append(word) | |
| # Add common variations and stems | |
| if word.endswith('s') and len(word) > 3: | |
| keywords.append(word[:-1]) # Remove 's' for plurals | |
| if word.endswith('ing') and len(word) > 4: | |
| keywords.append(word[:-3]) # Remove 'ing' | |
| if word.endswith('ed') and len(word) > 3: | |
| keywords.append(word[:-2]) # Remove 'ed' | |
| return list(set(keywords)) # Remove duplicates | |
| def _calculate_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate similarity between two texts using advanced methods""" | |
| norm1 = self._normalize_text(text1) | |
| norm2 = self._normalize_text(text2) | |
| # Method 1: Sequence matcher on normalized text | |
| sequence_similarity = SequenceMatcher(None, norm1, norm2).ratio() | |
| # Method 2: Enhanced keyword overlap with stemming | |
| keywords1 = set(self._extract_keywords(text1)) | |
| keywords2 = set(self._extract_keywords(text2)) | |
| keyword_similarity = 0.0 | |
| if keywords1 and keywords2: | |
| intersection = keywords1.intersection(keywords2) | |
| union = keywords1.union(keywords2) | |
| keyword_similarity = len(intersection) / len(union) if union else 0.0 | |
| # Method 3: Substring containment (both directions) | |
| contains_similarity = 0.0 | |
| if norm1 in norm2: | |
| contains_similarity = max(contains_similarity, 0.9 * (len(norm1) / len(norm2))) | |
| if norm2 in norm1: | |
| contains_similarity = max(contains_similarity, 0.9 * (len(norm2) / len(norm1))) | |
| # Method 4: Enhanced word order similarity | |
| words1 = norm1.split() | |
| words2 = norm2.split() | |
| word_order_similarity = 0.0 | |
| if words1 and words2: | |
| # Check for common word sequences (exact order) | |
| common_sequences = 0 | |
| max_len = min(len(words1), len(words2)) | |
| for i in range(max_len): | |
| if words1[i] == words2[i]: | |
| common_sequences += 1 | |
| exact_order_similarity = common_sequences / max_len if max_len > 0 else 0.0 | |
| # Check for word order flexibility (any order) | |
| set1 = set(words1) | |
| set2 = set(words2) | |
| common_words = set1.intersection(set2) | |
| total_words = set1.union(set2) | |
| flexible_order_similarity = len(common_words) / len(total_words) if total_words else 0.0 | |
| # Check for phrase patterns (like "available courses" vs "courses available") | |
| phrase_similarity = self._calculate_phrase_order_similarity(words1, words2) | |
| # Combine different word order methods | |
| word_order_similarity = ( | |
| exact_order_similarity * 0.3 + | |
| flexible_order_similarity * 0.5 + | |
| phrase_similarity * 0.2 | |
| ) | |
| # Method 5: Semantic similarity using word relationships | |
| semantic_similarity = self._calculate_semantic_similarity(keywords1, keywords2) | |
| # Method 6: Length similarity (shorter queries should match longer answers) | |
| length_similarity = 0.0 | |
| if len(norm1) > 0 and len(norm2) > 0: | |
| length_ratio = min(len(norm1), len(norm2)) / max(len(norm1), len(norm2)) | |
| length_similarity = length_ratio * 0.3 # Lower weight for length | |
| # Method 7: Phrase matching (for common phrases) | |
| phrase_similarity = self._calculate_phrase_similarity(norm1, norm2) | |
| # Combine all methods with optimized weights | |
| final_similarity = ( | |
| sequence_similarity * 0.25 + | |
| keyword_similarity * 0.30 + | |
| contains_similarity * 0.20 + | |
| word_order_similarity * 0.10 + | |
| semantic_similarity * 0.10 + | |
| length_similarity * 0.03 + | |
| phrase_similarity * 0.02 | |
| ) | |
| return min(final_similarity, 1.0) # Cap at 1.0 | |
| def _calculate_semantic_similarity(self, keywords1: set, keywords2: set) -> float: | |
| """Calculate semantic similarity using word relationships""" | |
| if not keywords1 or not keywords2: | |
| return 0.0 | |
| # Common semantic relationships | |
| semantic_groups = { | |
| 'money': {'cost', 'price', 'tuition', 'fee', 'payment', 'money', 'financial', 'aid', 'scholarship'}, | |
| 'time': {'deadline', 'when', 'time', 'date', 'schedule', 'duration', 'period'}, | |
| 'contact': {'contact', 'phone', 'email', 'address', 'office', 'reach', 'call'}, | |
| 'requirements': {'requirement', 'need', 'required', 'must', 'prerequisite', 'condition'}, | |
| 'application': {'apply', 'application', 'submit', 'process', 'procedure'}, | |
| 'programs': {'program', 'course', 'major', 'degree', 'study', 'academic', 'available', 'offered', 'listings'}, | |
| 'admission': {'admission', 'admit', 'accept', 'enroll', 'entry', 'enter'}, | |
| 'courses': {'course', 'courses', 'program', 'programs', 'major', 'majors', 'degree', 'degrees', 'available', 'offered', 'listings', 'what', 'which'} | |
| } | |
| # Check if keywords belong to the same semantic group | |
| semantic_score = 0.0 | |
| for group, words in semantic_groups.items(): | |
| group1_match = any(keyword in words for keyword in keywords1) | |
| group2_match = any(keyword in words for keyword in keywords2) | |
| if group1_match and group2_match: | |
| semantic_score += 0.3 | |
| return min(semantic_score, 1.0) | |
| def _calculate_phrase_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate similarity based on common phrases""" | |
| # Common phrases that should match | |
| common_phrases = [ | |
| ('admission requirements', 'requirements admission'), | |
| ('financial aid', 'aid financial'), | |
| ('tuition cost', 'cost tuition'), | |
| ('application deadline', 'deadline application'), | |
| ('contact admissions', 'admissions contact'), | |
| ('gpa requirement', 'requirement gpa'), | |
| ('academic requirements', 'requirements academic') | |
| ] | |
| phrase_score = 0.0 | |
| for phrase1, phrase2 in common_phrases: | |
| if (phrase1 in text1 and phrase1 in text2) or (phrase2 in text1 and phrase2 in text2): | |
| phrase_score += 0.5 | |
| elif (phrase1 in text1 and phrase2 in text2) or (phrase2 in text1 and phrase1 in text2): | |
| phrase_score += 0.4 | |
| return min(phrase_score, 1.0) | |
| def _calculate_phrase_order_similarity(self, words1: List[str], words2: List[str]) -> float: | |
| """Calculate similarity based on phrase order flexibility""" | |
| if not words1 or not words2: | |
| return 0.0 | |
| # Common phrase patterns that should match regardless of order | |
| phrase_patterns = [ | |
| (['available', 'courses'], ['courses', 'available']), | |
| (['admission', 'requirements'], ['requirements', 'admission']), | |
| (['financial', 'aid'], ['aid', 'financial']), | |
| (['tuition', 'cost'], ['cost', 'tuition']), | |
| (['application', 'deadline'], ['deadline', 'application']), | |
| (['contact', 'admissions'], ['admissions', 'contact']), | |
| (['gpa', 'requirement'], ['requirement', 'gpa']), | |
| (['academic', 'requirements'], ['requirements', 'academic']), | |
| (['programs', 'available'], ['available', 'programs']), | |
| (['what', 'programs'], ['programs', 'what']), | |
| (['what', 'courses'], ['courses', 'what']), | |
| (['what', 'available'], ['available', 'what']) | |
| ] | |
| # Check for phrase pattern matches | |
| for pattern1, pattern2 in phrase_patterns: | |
| # Check if words1 contains pattern1 and words2 contains pattern2 | |
| if (all(word in words1 for word in pattern1) and | |
| all(word in words2 for word in pattern2)): | |
| return 0.8 | |
| # Check if words1 contains pattern2 and words2 contains pattern1 | |
| if (all(word in words1 for word in pattern2) and | |
| all(word in words2 for word in pattern1)): | |
| return 0.8 | |
| # Check for partial phrase matches | |
| for pattern1, pattern2 in phrase_patterns: | |
| # Check if at least 2 words from each pattern are present | |
| words1_matches = sum(1 for word in pattern1 if word in words1) | |
| words2_matches = sum(1 for word in pattern2 if word in words2) | |
| if words1_matches >= 2 and words2_matches >= 2: | |
| return 0.6 | |
| return 0.0 | |
| def _find_best_match(self, user_question: str, database_questions: List[str], threshold: float = 0.25) -> Optional[str]: | |
| """Find the best matching question from database with improved logic""" | |
| if not database_questions: | |
| return None | |
| best_match = None | |
| best_score = 0.0 | |
| all_scores = [] | |
| # Calculate similarity for all questions | |
| for db_question in database_questions: | |
| similarity = self._calculate_similarity(user_question, db_question) | |
| all_scores.append((db_question, similarity)) | |
| if similarity > best_score: | |
| best_score = similarity | |
| best_match = db_question | |
| # Sort by similarity score | |
| all_scores.sort(key=lambda x: x[1], reverse=True) | |
| # If the best score is above threshold, return it | |
| if best_score >= threshold: | |
| return best_match | |
| # If no single match is above threshold, try adaptive threshold | |
| if all_scores: | |
| # Use the top score if it's reasonably close to threshold | |
| top_score = all_scores[0][1] | |
| if top_score >= threshold * 0.8: # 80% of threshold | |
| return all_scores[0][0] | |
| # Last resort: if user question is very short, be more lenient | |
| if len(user_question.split()) <= 3 and all_scores: | |
| # For short queries, use a lower threshold | |
| if all_scores[0][1] >= 0.15: | |
| return all_scores[0][0] | |
| return None | |
| def _generate_query_variants(self, question: str) -> List[str]: | |
| """Generate lightweight query variants to improve matching against FAQs""" | |
| variants: List[str] = [] | |
| original = question.strip() | |
| variants.append(original) | |
| # Normalized | |
| norm = self._normalize_text(original) | |
| variants.append(norm) | |
| # Remove trailing punctuation and repeated spaces already handled by normalize | |
| # Simple lemmatization-ish tweaks for common cases | |
| rules = [ | |
| (r"\btakes\b", "take"), | |
| (r"\btake\b", "takes"), | |
| (r"\bdoes\b", "do"), | |
| (r"\bdo\b", "does"), | |
| (r"\bis\b", "are"), | |
| (r"\bare\b", "is"), | |
| ] | |
| for pattern, repl in rules: | |
| try: | |
| v = re.sub(pattern, repl, norm) | |
| if v not in variants: | |
| variants.append(v) | |
| except Exception: | |
| pass | |
| # Last-word singular/plural toggle | |
| words = norm.split() | |
| if words: | |
| last = words[-1] | |
| if len(last) > 3 and last.endswith('s'): | |
| alt = ' '.join(words[:-1] + [last[:-1]]) | |
| if alt not in variants: | |
| variants.append(alt) | |
| else: | |
| alt = ' '.join(words[:-1] + [last + 's']) | |
| if alt not in variants: | |
| variants.append(alt) | |
| # De-duplicate while preserving order | |
| seen = set() | |
| unique_variants: List[str] = [] | |
| for v in variants: | |
| if v not in seen and v: | |
| seen.add(v) | |
| unique_variants.append(v) | |
| return unique_variants | |
| def fetch_from_database(self, question: str) -> str: | |
| """Fetch answer from the database with smart matching""" | |
| try: | |
| # First, try to get all available questions for smart matching | |
| all_questions = self._get_all_questions() | |
| # If we have all questions, try smart matching first | |
| if all_questions: | |
| best_match = self._find_best_match(question, all_questions) | |
| if best_match: | |
| # Try to get answer for the best matching question | |
| answer = self._get_answer_for_question(best_match) | |
| if answer and not self._is_no_answer_response(answer): | |
| return answer | |
| # Fallback to original method if smart matching doesn't work | |
| endpoints = [ | |
| f"{self.database_url}/faqs", | |
| f"{self.database_url}/faq", | |
| f"{self.database_url}/search", | |
| f"{self.database_url}/query", | |
| f"{self.database_url}/api/faq" | |
| ] | |
| param_names = ["question", "q"] | |
| variants = self._generate_query_variants(question) | |
| for endpoint in endpoints: | |
| for variant in variants: | |
| for param_name in param_names: | |
| # Try GET | |
| try: | |
| response = requests.get( | |
| endpoint, | |
| params={param_name: variant}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| answer = data.get('answer', data.get('response', str(data))) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| elif isinstance(data, list) and len(data) > 0: | |
| answer = str(data[0]) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| else: | |
| answer = str(data) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| except Exception: | |
| pass | |
| # Try POST JSON | |
| try: | |
| response = requests.post( | |
| endpoint, | |
| json={param_name: variant}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| answer = data.get('answer', data.get('response', str(data))) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| elif isinstance(data, list) and len(data) > 0: | |
| answer = str(data[0]) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| else: | |
| answer = str(data) | |
| if answer and answer.strip() and not self._is_no_answer_response(answer): | |
| return answer | |
| except Exception: | |
| pass | |
| # If no answer found, save the question as unanswered | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, I couldn't find a specific answer to your question in our database. I've saved your question for review, and we'll work on providing a better answer in the future. Could you try rephrasing your question or ask me something else?" | |
| else: | |
| return "I'm sorry, I couldn't find a specific answer to your question in our database. I tried to save your question for review, but there was an issue with our database connection. Could you try rephrasing your question or ask me something else?" | |
| except requests.exceptions.Timeout: | |
| # Save the question even if there's a timeout | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, the database is taking too long to respond. I've saved your question for review. Please try again in a moment." | |
| else: | |
| return "I'm sorry, the database is taking too long to respond. Please try again in a moment." | |
| except requests.exceptions.ConnectionError: | |
| # Save the question even if there's a connection error | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return "I'm sorry, I'm having trouble connecting to our database right now. I've saved your question for review. Please try again later." | |
| else: | |
| return "I'm sorry, I'm having trouble connecting to our database right now. Please try again later." | |
| except Exception as e: | |
| # Save the question even if there's an unexpected error | |
| saved = self.save_unanswered_question(question) | |
| if saved: | |
| return f"I encountered an error while searching our database: {str(e)}. I've saved your question for review. Please try again." | |
| else: | |
| return f"I encountered an error while searching our database: {str(e)}. Please try again." | |
| def _get_all_questions(self) -> List[str]: | |
| """Get all available questions from the database for smart matching""" | |
| try: | |
| # Try different endpoints to get all questions | |
| endpoints = [ | |
| f"{self.database_url}/questions", | |
| f"{self.database_url}/faq/all", | |
| f"{self.database_url}/api/questions", | |
| f"{self.database_url}/all_questions" | |
| ] | |
| for endpoint in endpoints: | |
| try: | |
| response = requests.get(endpoint, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, list): | |
| return [str(item) for item in data] | |
| elif isinstance(data, dict) and 'questions' in data: | |
| return [str(q) for q in data['questions']] | |
| except: | |
| continue | |
| return [] | |
| except: | |
| return [] | |
| def _get_answer_for_question(self, question: str) -> Optional[str]: | |
| """Get answer for a specific question""" | |
| try: | |
| endpoints = [ | |
| f"{self.database_url}/faqs", | |
| f"{self.database_url}/faq", | |
| f"{self.database_url}/search", | |
| f"{self.database_url}/query", | |
| f"{self.database_url}/api/faq" | |
| ] | |
| param_names = ["question", "q"] | |
| variants = self._generate_query_variants(question) | |
| for endpoint in endpoints: | |
| for variant in variants: | |
| for param_name in param_names: | |
| try: | |
| response = requests.get( | |
| endpoint, | |
| params={param_name: variant}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| return data.get('answer', data.get('response', str(data))) | |
| elif isinstance(data, list) and len(data) > 0: | |
| return str(data[0]) | |
| else: | |
| return str(data) | |
| except Exception: | |
| pass | |
| try: | |
| response = requests.post( | |
| endpoint, | |
| json={param_name: variant}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, dict): | |
| return data.get('answer', data.get('response', str(data))) | |
| elif isinstance(data, list) and len(data) > 0: | |
| return str(data[0]) | |
| else: | |
| return str(data) | |
| except Exception: | |
| pass | |
| return None | |
| except: | |
| return None | |
| def _is_no_answer_response(self, answer: str) -> bool: | |
| """Check if the response indicates no answer was found""" | |
| no_answer_indicators = [ | |
| "no answer", | |
| "not found", | |
| "no results", | |
| "no data", | |
| "empty", | |
| "null", | |
| "none", | |
| "i don't know", | |
| "i don't have", | |
| "cannot find", | |
| "unable to find" | |
| ] | |
| answer_lower = answer.lower().strip() | |
| return any(indicator in answer_lower for indicator in no_answer_indicators) | |
| def chat(self, message: str, history: List[List[str]]) -> str: | |
| """Main chat function""" | |
| if not message.strip(): | |
| return "Please enter a message so I can help you!" | |
| # Check for profanity first | |
| if self.contains_profanity(message): | |
| response = self.get_profanity_warning() | |
| # Store conversation history (but don't process the message) | |
| self.conversation_history.append(("user", "[Filtered]")) | |
| self.conversation_history.append(("bot", response)) | |
| return response | |
| # Store conversation history | |
| self.conversation_history.append(("user", message)) | |
| # Check for conversation patterns | |
| if self.is_greeting(message): | |
| response = self.get_greeting_response() | |
| elif self.is_help_request(message): | |
| response = self.get_help_response() | |
| elif self.is_thanks(message): | |
| response = self.get_thanks_response() | |
| elif self.is_goodbye(message): | |
| response = self.get_goodbye_response() | |
| else: | |
| # Try to fetch from database | |
| response = self.fetch_from_database(message) | |
| # Store bot response | |
| self.conversation_history.append(("bot", response)) | |
| return response | |
| # Initialize the chatbot | |
| chatbot = AIChatbot() | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="AI Chatbot" | |
| ) as interface: | |
| gr.Markdown( | |
| """ | |
| # 🤖 AI Chatbot Assistant | |
| Welcome! I'm your AI assistant that can help you with: | |
| - **General conversation** and friendly chat | |
| - **Specific questions** answered from our knowledge database | |
| Just type your message below and I'll do my best to help you! | |
| """ | |
| ) | |
| # Chat interface | |
| chatbot_interface = gr.ChatInterface( | |
| fn=chatbot.chat, | |
| title="Chat with AI Assistant", | |
| description="Ask me anything or just have a conversation!", | |
| examples=[ | |
| "Hello!", | |
| "What can you help me with?", | |
| "How do I contact support?", | |
| "What are your services?", | |
| "Thank you for your help!" | |
| ], | |
| cache_examples=False | |
| ) | |
| # Footer | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Note**: This chatbot can handle general conversation and search our database for specific information. | |
| If you don't get the answer you're looking for, try rephrasing your question! | |
| """ | |
| ) | |
| return interface | |
| # Launch the application | |
| if __name__ == "__main__": | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True | |
| ) | |