Spaces:
Build error
Build error
| from typing import List, Dict, Optional, Tuple | |
| from datetime import datetime, timedelta | |
| import feedparser | |
| from bs4 import BeautifulSoup | |
| import pytz | |
| import os | |
| import json | |
| from transformers import T5Tokenizer, T5ForConditionalGeneration | |
| from fuzzywuzzy import fuzz | |
| class EventMatcher: | |
| def __init__(self): | |
| """Initialize the event matcher""" | |
| print("Initializing Event Matcher...") | |
| self.eastern = pytz.timezone('America/New_York') | |
| self.events = [] | |
| self.last_update = None | |
| self.cache_file = "events_cache.json" | |
| # Initialize T5 model for response generation | |
| self.tokenizer = T5Tokenizer.from_pretrained("t5-small") | |
| self.model = T5ForConditionalGeneration.from_pretrained("t5-small") | |
| # Load initial events | |
| self.load_from_cache() | |
| self.update_events() | |
| def query(self, user_query: str) -> str: | |
| """Main query method called by the app""" | |
| try: | |
| # Update events if needed - make this non-blocking | |
| if self.last_update is None or (datetime.now() - self.last_update).seconds >= 3600: | |
| # Start a background task to update events | |
| self.update_events() | |
| # Quick response for empty query | |
| if not user_query.strip(): | |
| return "Please ask me about events at Brock University!" | |
| # Find matching events - optimize by limiting initial search | |
| matched_events = self.find_matching_events(user_query) | |
| if not matched_events: | |
| return "I couldn't find any events matching your query. Try asking in a different way!" | |
| # Format the response without T5 for faster response | |
| events_text = "" | |
| for event, score in matched_events: | |
| events_text += f""" | |
| π {event['title']} | |
| π {event['start_time'].strftime('%A, %B %d, %Y')} | |
| β° {event['start_time'].strftime('%I:%M %p')} | |
| π {event['location']} | |
| π·οΈ {event['categories']} | |
| π₯ {event['hosts']} | |
| {event['description'][:200]}... | |
| π {event['link']} | |
| """ | |
| # Create a simple response prefix based on the query type | |
| if "today" in user_query.lower(): | |
| prefix = "Here are today's events:" | |
| elif "week" in user_query.lower(): | |
| prefix = "Here are the events happening this week:" | |
| elif any(word in user_query.lower() for word in ["workshop", "training", "seminar"]): | |
| prefix = "I found these workshops and seminars:" | |
| elif any(word in user_query.lower() for word in ["faculty", "department", "school"]): | |
| prefix = "Here are the faculty-related events:" | |
| else: | |
| prefix = "Here are some events that match your query:" | |
| return f"{prefix}\n{events_text}" | |
| except Exception as e: | |
| print(f"Error in query: {e}") | |
| return "I encountered an error processing your query. Please try again!" | |
| return "I encountered an error processing your query. Please try again!" | |
| def find_matching_events(self, query: str) -> List[Tuple[Dict, float]]: | |
| """Find events matching the query - optimized version""" | |
| matched_events = [] | |
| query_lower = query.lower() | |
| for event in self.events: | |
| # Quick initial filter | |
| if any(term in event['title'].lower() or | |
| term in event['description'].lower()[:200] or | |
| term in event['location'].lower() or | |
| term in event['categories'].lower() | |
| for term in query_lower.split()): | |
| # Calculate similarity scores only for potentially matching events | |
| title_score = fuzz.token_set_ratio(query_lower, event['title'].lower()) / 100 | |
| desc_score = fuzz.token_set_ratio(query_lower, event['description'].lower()) / 100 | |
| location_score = fuzz.token_set_ratio(query_lower, event['location'].lower()) / 100 | |
| categories_score = fuzz.token_set_ratio(query_lower, event['categories'].lower()) / 100 | |
| # Weight the scores | |
| total_score = ( | |
| title_score * 0.4 + | |
| desc_score * 0.3 + | |
| location_score * 0.2 + | |
| categories_score * 0.1 | |
| ) | |
| if total_score > 0.3: # Threshold for relevance | |
| matched_events.append((event, total_score)) | |
| # Sort by score and get top matches | |
| matched_events.sort(key=lambda x: x[1], reverse=True) | |
| return matched_events[:3] | |
| def parse_event_datetime(self, entry) -> Tuple[Optional[datetime], Optional[datetime]]: | |
| """Parse event dates from RSS feed""" | |
| try: | |
| start_time = entry.get('start', None) | |
| end_time = entry.get('end', None) | |
| # Try RSS feed times first | |
| if start_time: | |
| start_dt = datetime.strptime(start_time, '%a, %d %b %Y %H:%M:%S %Z') | |
| start_dt = pytz.UTC.localize(start_dt).astimezone(self.eastern) | |
| else: | |
| # Try HTML parsing if RSS times not available | |
| soup = BeautifulSoup(entry.description, 'html.parser') | |
| start_elem = soup.find('time', class_='dt-start') | |
| if start_elem and 'datetime' in start_elem.attrs: | |
| dt_str = start_elem['datetime'].split('.')[0] | |
| start_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S') | |
| start_dt = self.eastern.localize(start_dt) | |
| else: | |
| return None, None | |
| if end_time: | |
| end_dt = datetime.strptime(end_time, '%a, %d %b %Y %H:%M:%S %Z') | |
| end_dt = pytz.UTC.localize(end_dt).astimezone(self.eastern) | |
| else: | |
| end_dt = None | |
| return start_dt, end_dt | |
| except Exception as e: | |
| print(f"Error parsing dates: {e}") | |
| return None, None | |
| def update_events(self) -> None: | |
| """Update events from RSS feed with caching""" | |
| try: | |
| # Check if we need to update (cache is less than 1 hour old) | |
| if self.last_update and (datetime.now() - self.last_update).seconds < 3600: | |
| return | |
| print("Fetching events from RSS feed...") | |
| feed = feedparser.parse("https://experiencebu.brocku.ca/events.rss") | |
| new_events = [] | |
| for entry in feed.entries: | |
| event = self.process_event_entry(entry) | |
| if event: | |
| new_events.append(event) | |
| if new_events: | |
| self.events = new_events | |
| self.last_update = datetime.now() | |
| print(f"Updated {len(self.events)} events") | |
| # Save to cache | |
| self.save_to_cache() | |
| except Exception as e: | |
| print(f"Error updating events: {e}") | |
| self.load_from_cache() | |
| def process_event_entry(self, entry) -> Optional[Dict]: | |
| """Process a single event entry""" | |
| try: | |
| # Parse dates | |
| start_time, end_time = self.parse_event_datetime(entry) | |
| if not self.is_event_valid(start_time): | |
| return None | |
| # Extract event details | |
| categories = self.extract_categories(entry) | |
| hosts = self.extract_hosts(entry) | |
| description = self.clean_description(entry.description) | |
| return { | |
| 'title': entry.title, | |
| 'description': description, | |
| 'start_time': start_time, | |
| 'end_time': end_time, | |
| 'location': entry.get('location', 'Location not specified'), | |
| 'categories': ';'.join(categories), | |
| 'hosts': ';'.join(hosts), | |
| 'link': entry.link, | |
| 'guid': entry.guid | |
| } | |
| except Exception as e: | |
| print(f"Error processing event entry: {e}") | |
| return None | |
| # [Helper methods from original code] | |
| def extract_categories(self, entry) -> List[str]: | |
| try: | |
| return [tag.term for tag in entry.get('tags', [])] | |
| except Exception: | |
| return [] | |
| def extract_hosts(self, entry) -> List[str]: | |
| try: | |
| hosts = entry.get('host', []) | |
| if not isinstance(hosts, list): | |
| hosts = [hosts] | |
| return [h for h in hosts if h] | |
| except Exception: | |
| return [] | |
| def clean_description(self, description: str) -> str: | |
| try: | |
| soup = BeautifulSoup(description, 'html.parser') | |
| return ' '.join(soup.get_text().split()) | |
| except Exception: | |
| return description | |
| def is_event_valid(self, start_time: Optional[datetime]) -> bool: | |
| if not start_time: | |
| return False | |
| now = datetime.now(self.eastern) | |
| two_weeks = now + timedelta(days=14) | |
| return now <= start_time <= two_weeks | |
| # [Cache handling methods] | |
| def save_to_cache(self) -> None: | |
| try: | |
| cache_data = { | |
| 'last_update': self.last_update.isoformat(), | |
| 'events': [] | |
| } | |
| for event in self.events: | |
| event_copy = event.copy() | |
| event_copy['start_time'] = event_copy['start_time'].isoformat() | |
| if event_copy.get('end_time'): | |
| event_copy['end_time'] = event_copy['end_time'].isoformat() | |
| cache_data['events'].append(event_copy) | |
| with open(self.cache_file, 'w') as f: | |
| json.dump(cache_data, f) | |
| except Exception as e: | |
| print(f"Error saving to cache: {e}") | |
| def load_from_cache(self) -> None: | |
| try: | |
| if not os.path.exists(self.cache_file): | |
| return | |
| with open(self.cache_file, 'r') as f: | |
| cache_data = json.load(f) | |
| self.last_update = datetime.fromisoformat(cache_data['last_update']) | |
| self.events = [] | |
| for event in cache_data['events']: | |
| event['start_time'] = datetime.fromisoformat(event['start_time']) | |
| if event.get('end_time'): | |
| event['end_time'] = datetime.fromisoformat(event['end_time']) | |
| self.events.append(event) | |
| print(f"Loaded {len(self.events)} events from cache") | |
| except Exception as e: | |
| print(f"Error loading from cache: {e}") |