Spaces:
Build error
Build error
| # app.py | |
| import gradio as gr | |
| import feedparser | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime, timedelta | |
| import pytz | |
| from typing import List, Dict | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| import gc | |
| import json | |
| import os | |
| class BrockEventsRAG: | |
| def __init__(self): | |
| """Initialize the RAG system with improved caching""" | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.embeddings = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2') | |
| # ChromaDB client setup | |
| self.chroma_client = chromadb.Client(Settings(persist_directory="chroma_db", chroma_db_impl="duckdb+parquet")) | |
| # LLM model setup | |
| self.tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-small") | |
| self.llm = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-small") | |
| # Get current date range | |
| self.eastern = pytz.timezone('America/New_York') | |
| self.today = datetime.now(self.eastern).replace(hour=0, minute=0, second=0, microsecond=0) | |
| self.date_range_end = self.today + timedelta(days=14) | |
| # Cache directory setup | |
| os.makedirs("cache", exist_ok=True) | |
| self.cache_file = "cache/events_cache.json" | |
| # Initialize or reset collection | |
| try: | |
| self.collection = self.chroma_client.create_collection( | |
| name="brock_events", | |
| metadata={"description": "Brock University Events Database"} | |
| ) | |
| except Exception: | |
| self.chroma_client.delete_collection("brock_events") | |
| self.collection = self.chroma_client.create_collection( | |
| name="brock_events", | |
| metadata={"description": "Brock University Events Database"} | |
| ) | |
| # Load initial events | |
| self.update_database() | |
| def fetch_rss_feed(self, url: str) -> List[Dict]: | |
| """Fetch and parse RSS feed from the given URL""" | |
| try: | |
| feed = feedparser.parse(url) | |
| entries = feed.entries | |
| print(f"Fetched {len(entries)} entries from the feed.") | |
| return entries | |
| except Exception as e: | |
| print(f"Error fetching RSS feed: {e}") | |
| return [] | |
| def parse_event_datetime(self, entry) -> tuple: | |
| """Parse start and end times from both RSS and HTML""" | |
| try: | |
| # First try to get times from the events namespace | |
| start_time = entry.get('start', None) | |
| end_time = entry.get('end', None) | |
| # Parse the RSS feed times if available | |
| if start_time: | |
| start_dt = datetime.strptime(start_time, '%a, %d %b %Y %H:%M:%S %Z') | |
| start_dt = pytz.UTC.localize(start_dt).astimezone(self.eastern) | |
| else: | |
| start_dt = None | |
| if end_time: | |
| end_dt = datetime.strptime(end_time, '%a, %d %b %Y %H:%M:%S %Z') | |
| end_dt = pytz.UTC.localize(end_dt).astimezone(self.eastern) | |
| else: | |
| end_dt = None | |
| # If we didn't get times from RSS, try HTML | |
| if not start_dt or not end_dt: | |
| soup = BeautifulSoup(entry.description, 'html.parser') | |
| start_elem = soup.find('time', class_='dt-start') | |
| end_elem = soup.find('time', class_='dt-end') | |
| if start_elem and 'datetime' in start_elem.attrs: | |
| dt_str = start_elem['datetime'].split('.')[0] | |
| start_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S') | |
| start_dt = self.eastern.localize(start_dt) | |
| if end_elem and 'datetime' in end_elem.attrs: | |
| dt_str = end_elem['datetime'].split('.')[0] | |
| end_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S') | |
| end_dt = self.eastern.localize(end_dt) | |
| return start_dt, end_dt | |
| except Exception as e: | |
| print(f"Error parsing dates: {e}") | |
| return None, None | |
| def get_location(self, entry) -> str: | |
| """Extract location from both RSS and HTML""" | |
| try: | |
| # First try RSS events namespace | |
| location = entry.get('location', None) | |
| # If not found, try HTML | |
| if not location: | |
| soup = BeautifulSoup(entry.description, 'html.parser') | |
| location_elem = soup.find('span', class_='p-location') | |
| if location_elem: | |
| location = location_elem.get_text().strip() | |
| return location if location else "Location not specified" | |
| except Exception as e: | |
| print(f"Error getting location: {e}") | |
| return "Location not specified" | |
| def process_event(self, entry) -> Dict: | |
| """Process a single event entry""" | |
| try: | |
| # Get times | |
| start_time, end_time = self.parse_event_datetime(entry) | |
| # Skip if event is not in our date range | |
| if not start_time or not self.is_event_in_range(start_time): | |
| return None | |
| # Get location | |
| location = self.get_location(entry) | |
| # Get categories | |
| categories = [tag.term for tag in entry.get('tags', [])] | |
| categories_str = '; '.join(categories) if categories else 'No categories' | |
| # Get hosts | |
| hosts = entry.get('host', []) | |
| if not isinstance(hosts, list): | |
| hosts = [hosts] | |
| hosts_str = '; '.join(hosts) if hosts else 'No host specified' | |
| # Clean description | |
| soup = BeautifulSoup(entry.description, 'html.parser') | |
| description = ' '.join(soup.get_text().split()) | |
| return { | |
| 'title': entry.title, | |
| 'start_time': start_time, | |
| 'end_time': end_time, | |
| 'location': location, | |
| 'categories': categories_str, | |
| 'hosts': hosts_str, | |
| 'description': description, | |
| 'link': entry.link, | |
| 'guid': entry.guid | |
| } | |
| except Exception as e: | |
| print(f"Error processing event {entry.get('title', 'Unknown')}: {e}") | |
| return None | |
| def is_event_in_range(self, event_time: datetime) -> bool: | |
| """Check if event falls within our date range""" | |
| if not event_time: | |
| return False | |
| return self.today <= event_time <= self.date_range_end | |
| def format_event_text(self, event: Dict) -> str: | |
| """Format event information for embedding""" | |
| return f""" | |
| Event: {event['title']} | |
| Date: {event['start_time'].strftime('%A, %B %d, %Y')} | |
| Time: {event['start_time'].strftime('%I:%M %p')} to {event['end_time'].strftime('%I:%M %p') if event['end_time'] else 'not specified'} | |
| Location: {event['location']} | |
| Categories: {event['categories']} | |
| Hosted by: {event['hosts']} | |
| Description: {event['description'][:500]} | |
| """ | |
| def update_database(self): | |
| """Update database with events in date range""" | |
| print("Fetching events...") | |
| feed = feedparser.parse("https://experiencebu.brocku.ca/events.rss") | |
| print(f"Found {len(feed.entries)} total events") | |
| # Process events | |
| valid_events = [] | |
| for entry in feed.entries: | |
| event = self.process_event(entry) | |
| if event: # Only include events in our date range | |
| valid_events.append(event) | |
| print(f"Found {len(valid_events)} events in the next 14 days") | |
| if not valid_events: | |
| print("No events found in date range") | |
| return | |
| # Prepare data for database | |
| documents = [self.format_event_text(event) for event in valid_events] | |
| metadatas = [{ | |
| 'title': event['title'], | |
| 'date': event['start_time'].strftime('%Y-%m-%d'), | |
| 'time': event['start_time'].strftime('%I:%M %p'), | |
| 'location': event['location'], | |
| 'categories': event['categories'], | |
| 'link': event['link'] | |
| } for event in valid_events] | |
| ids = [f"event_{i}" for i in range(len(valid_events))] | |
| # Generate embeddings and add to database | |
| try: | |
| embeddings = self.model.encode(documents) | |
| self.collection.add( | |
| documents=documents, | |
| embeddings=embeddings.tolist(), | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| print(f"Successfully added {len(valid_events)} events to database") | |
| except Exception as e: | |
| print(f"Error adding events to database: {e}") | |
| # Save to cache | |
| cache_data = { | |
| 'last_update': datetime.now().isoformat(), | |
| 'events': valid_events | |
| } | |
| self.save_cache(cache_data) | |
| # Clean up | |
| gc.collect() | |
| def query(self, question: str, n_results: int = 3) -> List[Dict]: | |
| """Query the database""" | |
| try: | |
| question_embedding = self.model.encode(question) | |
| results = self.collection.query( | |
| query_embeddings=[question_embedding.tolist()], | |
| n_results=n_results, | |
| include=['documents', 'metadatas', 'distances'] | |
| ) | |
| return results | |
| except Exception as e: | |
| print(f"Error during query: {e}") | |
| return None | |
| def generate_response_with_llm(events: List[Dict]) -> str: | |
| """Use the LLM to generate a natural language response for the given events.""" | |
| try: | |
| if not events: | |
| input_text = "There are no events matching the query. How should I respond?" | |
| else: | |
| event_summaries = "\n".join([ | |
| f"Event: {event['title']}. Start: {event['start_time']}, Location: {event['location']}." | |
| for event in events | |
| ]) | |
| input_text = f"Format this information into a friendly response: {event_summaries}" | |
| inputs = self.tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True) | |
| outputs = self.llm.generate(**inputs) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return response | |
| except Exception as e: | |
| print(f"Error generating response: {e}") | |
| return "Sorry, I couldn't generate a response." | |
| def generate_response(self, question: str, history: list) -> str: | |
| """Generate a response based on the query and chat history""" | |
| try: | |
| # Query the database | |
| results = self.query(question) | |
| if not results or not results['documents'] or not results['documents'][0]: | |
| return "I couldn't find any events matching your query. Try asking about upcoming events in a different way!" | |
| # Analyze the question type | |
| question_lower = question.lower() | |
| is_time_query = any(word in question_lower for word in ['when', 'time', 'date', 'week', 'today', 'tomorrow']) | |
| is_location_query = any(word in question_lower for word in ['where', 'location', 'place', 'building', 'room']) | |
| # Format the response | |
| response = generate_response_with_llm(matched_events) | |
| # Add top 3 matching events | |
| for i, (doc, metadata) in enumerate(zip(results['documents'][0][:3], results['metadatas'][0][:3]), 1): | |
| response += f"{i}. **{metadata['title']}**\n" | |
| response += f"π {metadata['date']} at {metadata['time']}\n" | |
| response += f"π {metadata['location']}\n" | |
| if 'categories' in metadata: | |
| response += f"π·οΈ {metadata['categories']}\n" | |
| response += f"π More info: {metadata['link']}\n\n" | |
| # Add a helpful prompt | |
| response += "\nYou can ask me for more specific details about any of these events!" | |
| return response | |
| except Exception as e: | |
| print(f"Error generating response: {e}") | |
| return "I encountered an error while searching for events. Please try asking in a different way." | |
| def save_cache(self, data: dict): | |
| """Save events data to cache file""" | |
| try: | |
| # Convert datetime objects to strings for JSON serialization | |
| serializable_data = { | |
| 'last_update': data['last_update'], | |
| 'events': [] | |
| } | |
| for event in data['events']: | |
| event_copy = event.copy() | |
| # Convert datetime objects to strings | |
| if event_copy.get('start_time'): | |
| event_copy['start_time'] = event_copy['start_time'].isoformat() | |
| if event_copy.get('end_time'): | |
| event_copy['end_time'] = event_copy['end_time'].isoformat() | |
| serializable_data['events'].append(event_copy) | |
| with open(self.cache_file, 'w', encoding='utf-8') as f: | |
| json.dump(serializable_data, f, ensure_ascii=False, indent=2) | |
| print(f"Cache saved successfully to {self.cache_file}") | |
| except Exception as e: | |
| print(f"Error saving cache: {e}") | |
| """ | |
| def load_cache(self) -> dict: | |
| #Load and parse cached events data | |
| try: | |
| if os.path.exists(self.cache_file): | |
| with open(self.cache_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Convert string timestamps back to datetime objects | |
| for event in data['events']: | |
| if event.get('start_time'): | |
| event['start_time'] = datetime.fromisoformat(event['start_time']) | |
| if event.get('end_time'): | |
| event['end_time'] = datetime.fromisoformat(event['end_time']) | |
| return data | |
| return {'last_update': None, 'events': []} | |
| except Exception as e: | |
| print(f"Error loading cache: {e}") | |
| return {'last_update': None, 'events': []} | |
| def should_update_cache(self) -> bool: | |
| #Check if cache needs updating (older than 24 hours) | |
| try: | |
| cached_data = self.load_cache() | |
| if not cached_data['last_update']: | |
| return True | |
| last_update = datetime.fromisoformat(cached_data['last_update']) | |
| time_since_update = datetime.now() - last_update | |
| return time_since_update.total_seconds() > 86400 # 24 hours | |
| except Exception as e: | |
| print(f"Error checking cache: {e}") | |
| return True | |
| """ | |
| def create_demo(): | |
| # Initialize the RAG system | |
| rag_system = BrockEventsRAG() | |
| # Custom CSS for better appearance | |
| custom_css = """ | |
| .gr-button-primary { | |
| background-color: #8b0000 !important; | |
| border-color: #8b0000 !important; | |
| } | |
| """ | |
| # Create the Gradio interface | |
| with gr.Blocks(css=custom_css) as demo: | |
| gr.Markdown(""" | |
| # π Brock University Events Assistant | |
| Ask me about upcoming events at Brock! I can help you discover: | |
| - Academic workshops | |
| - Student activities | |
| - Campus events | |
| - And more! | |
| """) | |
| chatbot = gr.Chatbot( | |
| label="Chat History", | |
| height=400, | |
| bubble_full_width=False, | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Your Question", | |
| placeholder="e.g., What events are happening this week?", | |
| scale=4 | |
| ) | |
| submit = gr.Button("Ask", scale=1, variant="primary") | |
| with gr.Row(): | |
| clear = gr.Button("Clear Chat") | |
| refresh = gr.Button("Refresh Events") | |
| # Event handlers | |
| def respond(message, history): | |
| bot_message = rag_system.generate_response(message, history) | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": bot_message}) | |
| return "", history | |
| # In the create_demo function: | |
| chatbot = gr.Chatbot( | |
| label="Chat History", | |
| height=400, | |
| bubble_full_width=False, | |
| type="messages" # Use new message format | |
| ) | |
| def refresh_events(): | |
| rag_system.update_database() | |
| return "Events database has been refreshed!" | |
| submit.click(respond, [msg, chatbot], [msg, chatbot]) | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot) | |
| refresh.click(refresh_events, None, msg) | |
| # Example questions | |
| gr.Examples( | |
| examples=[ | |
| "What events are happening this week?", | |
| "Are there any workshops in the library?", | |
| "Tell me about upcoming career events", | |
| "What's happening in the MakerSpace?", | |
| "Any student club meetings soon?", | |
| ], | |
| inputs=msg | |
| ) | |
| gr.Markdown(""" | |
| ### Tips: | |
| - Ask about specific dates, locations, or event types | |
| - You can refresh the events database using the button above | |
| - Click on event links to get more details on ExperienceBU | |
| Data is refreshed automatically every 24 hours. Events shown are for the next 14 days. | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| demo.launch( | |
| server_name="0.0.0.0", # Required for Spaces | |
| server_port=7860, # Default port | |
| share=False, # Don't create a public link | |
| max_threads=40 # Handle concurrent users | |
| ) |