AryanJh's picture
LLM Google T5 integration
9e6b464 verified
raw
history blame
18.4 kB
# app.py
import gradio as gr
import feedparser
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import pytz
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import chromadb
import gc
import json
import os
class BrockEventsRAG:
def __init__(self):
"""Initialize the RAG system with improved caching"""
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.embeddings = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2')
# ChromaDB client setup
self.chroma_client = chromadb.Client(Settings(persist_directory="chroma_db", chroma_db_impl="duckdb+parquet"))
# LLM model setup
self.tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-small")
self.llm = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-small")
# Get current date range
self.eastern = pytz.timezone('America/New_York')
self.today = datetime.now(self.eastern).replace(hour=0, minute=0, second=0, microsecond=0)
self.date_range_end = self.today + timedelta(days=14)
# Cache directory setup
os.makedirs("cache", exist_ok=True)
self.cache_file = "cache/events_cache.json"
# Initialize or reset collection
try:
self.collection = self.chroma_client.create_collection(
name="brock_events",
metadata={"description": "Brock University Events Database"}
)
except Exception:
self.chroma_client.delete_collection("brock_events")
self.collection = self.chroma_client.create_collection(
name="brock_events",
metadata={"description": "Brock University Events Database"}
)
# Load initial events
self.update_database()
def fetch_rss_feed(self, url: str) -> List[Dict]:
"""Fetch and parse RSS feed from the given URL"""
try:
feed = feedparser.parse(url)
entries = feed.entries
print(f"Fetched {len(entries)} entries from the feed.")
return entries
except Exception as e:
print(f"Error fetching RSS feed: {e}")
return []
def parse_event_datetime(self, entry) -> tuple:
"""Parse start and end times from both RSS and HTML"""
try:
# First try to get times from the events namespace
start_time = entry.get('start', None)
end_time = entry.get('end', None)
# Parse the RSS feed times if available
if start_time:
start_dt = datetime.strptime(start_time, '%a, %d %b %Y %H:%M:%S %Z')
start_dt = pytz.UTC.localize(start_dt).astimezone(self.eastern)
else:
start_dt = None
if end_time:
end_dt = datetime.strptime(end_time, '%a, %d %b %Y %H:%M:%S %Z')
end_dt = pytz.UTC.localize(end_dt).astimezone(self.eastern)
else:
end_dt = None
# If we didn't get times from RSS, try HTML
if not start_dt or not end_dt:
soup = BeautifulSoup(entry.description, 'html.parser')
start_elem = soup.find('time', class_='dt-start')
end_elem = soup.find('time', class_='dt-end')
if start_elem and 'datetime' in start_elem.attrs:
dt_str = start_elem['datetime'].split('.')[0]
start_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
start_dt = self.eastern.localize(start_dt)
if end_elem and 'datetime' in end_elem.attrs:
dt_str = end_elem['datetime'].split('.')[0]
end_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
end_dt = self.eastern.localize(end_dt)
return start_dt, end_dt
except Exception as e:
print(f"Error parsing dates: {e}")
return None, None
def get_location(self, entry) -> str:
"""Extract location from both RSS and HTML"""
try:
# First try RSS events namespace
location = entry.get('location', None)
# If not found, try HTML
if not location:
soup = BeautifulSoup(entry.description, 'html.parser')
location_elem = soup.find('span', class_='p-location')
if location_elem:
location = location_elem.get_text().strip()
return location if location else "Location not specified"
except Exception as e:
print(f"Error getting location: {e}")
return "Location not specified"
def process_event(self, entry) -> Dict:
"""Process a single event entry"""
try:
# Get times
start_time, end_time = self.parse_event_datetime(entry)
# Skip if event is not in our date range
if not start_time or not self.is_event_in_range(start_time):
return None
# Get location
location = self.get_location(entry)
# Get categories
categories = [tag.term for tag in entry.get('tags', [])]
categories_str = '; '.join(categories) if categories else 'No categories'
# Get hosts
hosts = entry.get('host', [])
if not isinstance(hosts, list):
hosts = [hosts]
hosts_str = '; '.join(hosts) if hosts else 'No host specified'
# Clean description
soup = BeautifulSoup(entry.description, 'html.parser')
description = ' '.join(soup.get_text().split())
return {
'title': entry.title,
'start_time': start_time,
'end_time': end_time,
'location': location,
'categories': categories_str,
'hosts': hosts_str,
'description': description,
'link': entry.link,
'guid': entry.guid
}
except Exception as e:
print(f"Error processing event {entry.get('title', 'Unknown')}: {e}")
return None
def is_event_in_range(self, event_time: datetime) -> bool:
"""Check if event falls within our date range"""
if not event_time:
return False
return self.today <= event_time <= self.date_range_end
def format_event_text(self, event: Dict) -> str:
"""Format event information for embedding"""
return f"""
Event: {event['title']}
Date: {event['start_time'].strftime('%A, %B %d, %Y')}
Time: {event['start_time'].strftime('%I:%M %p')} to {event['end_time'].strftime('%I:%M %p') if event['end_time'] else 'not specified'}
Location: {event['location']}
Categories: {event['categories']}
Hosted by: {event['hosts']}
Description: {event['description'][:500]}
"""
def update_database(self):
"""Update database with events in date range"""
print("Fetching events...")
feed = feedparser.parse("https://experiencebu.brocku.ca/events.rss")
print(f"Found {len(feed.entries)} total events")
# Process events
valid_events = []
for entry in feed.entries:
event = self.process_event(entry)
if event: # Only include events in our date range
valid_events.append(event)
print(f"Found {len(valid_events)} events in the next 14 days")
if not valid_events:
print("No events found in date range")
return
# Prepare data for database
documents = [self.format_event_text(event) for event in valid_events]
metadatas = [{
'title': event['title'],
'date': event['start_time'].strftime('%Y-%m-%d'),
'time': event['start_time'].strftime('%I:%M %p'),
'location': event['location'],
'categories': event['categories'],
'link': event['link']
} for event in valid_events]
ids = [f"event_{i}" for i in range(len(valid_events))]
# Generate embeddings and add to database
try:
embeddings = self.model.encode(documents)
self.collection.add(
documents=documents,
embeddings=embeddings.tolist(),
metadatas=metadatas,
ids=ids
)
print(f"Successfully added {len(valid_events)} events to database")
except Exception as e:
print(f"Error adding events to database: {e}")
# Save to cache
cache_data = {
'last_update': datetime.now().isoformat(),
'events': valid_events
}
self.save_cache(cache_data)
# Clean up
gc.collect()
def query(self, question: str, n_results: int = 3) -> List[Dict]:
"""Query the database"""
try:
question_embedding = self.model.encode(question)
results = self.collection.query(
query_embeddings=[question_embedding.tolist()],
n_results=n_results,
include=['documents', 'metadatas', 'distances']
)
return results
except Exception as e:
print(f"Error during query: {e}")
return None
def generate_response_with_llm(events: List[Dict]) -> str:
"""Use the LLM to generate a natural language response for the given events."""
try:
if not events:
input_text = "There are no events matching the query. How should I respond?"
else:
event_summaries = "\n".join([
f"Event: {event['title']}. Start: {event['start_time']}, Location: {event['location']}."
for event in events
])
input_text = f"Format this information into a friendly response: {event_summaries}"
inputs = self.tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True)
outputs = self.llm.generate(**inputs)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return response
except Exception as e:
print(f"Error generating response: {e}")
return "Sorry, I couldn't generate a response."
def generate_response(self, question: str, history: list) -> str:
"""Generate a response based on the query and chat history"""
try:
# Query the database
results = self.query(question)
if not results or not results['documents'] or not results['documents'][0]:
return "I couldn't find any events matching your query. Try asking about upcoming events in a different way!"
# Analyze the question type
question_lower = question.lower()
is_time_query = any(word in question_lower for word in ['when', 'time', 'date', 'week', 'today', 'tomorrow'])
is_location_query = any(word in question_lower for word in ['where', 'location', 'place', 'building', 'room'])
# Format the response
response = generate_response_with_llm(matched_events)
# Add top 3 matching events
for i, (doc, metadata) in enumerate(zip(results['documents'][0][:3], results['metadatas'][0][:3]), 1):
response += f"{i}. **{metadata['title']}**\n"
response += f"πŸ“… {metadata['date']} at {metadata['time']}\n"
response += f"πŸ“ {metadata['location']}\n"
if 'categories' in metadata:
response += f"🏷️ {metadata['categories']}\n"
response += f"πŸ”— More info: {metadata['link']}\n\n"
# Add a helpful prompt
response += "\nYou can ask me for more specific details about any of these events!"
return response
except Exception as e:
print(f"Error generating response: {e}")
return "I encountered an error while searching for events. Please try asking in a different way."
def save_cache(self, data: dict):
"""Save events data to cache file"""
try:
# Convert datetime objects to strings for JSON serialization
serializable_data = {
'last_update': data['last_update'],
'events': []
}
for event in data['events']:
event_copy = event.copy()
# Convert datetime objects to strings
if event_copy.get('start_time'):
event_copy['start_time'] = event_copy['start_time'].isoformat()
if event_copy.get('end_time'):
event_copy['end_time'] = event_copy['end_time'].isoformat()
serializable_data['events'].append(event_copy)
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(serializable_data, f, ensure_ascii=False, indent=2)
print(f"Cache saved successfully to {self.cache_file}")
except Exception as e:
print(f"Error saving cache: {e}")
"""
def load_cache(self) -> dict:
#Load and parse cached events data
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Convert string timestamps back to datetime objects
for event in data['events']:
if event.get('start_time'):
event['start_time'] = datetime.fromisoformat(event['start_time'])
if event.get('end_time'):
event['end_time'] = datetime.fromisoformat(event['end_time'])
return data
return {'last_update': None, 'events': []}
except Exception as e:
print(f"Error loading cache: {e}")
return {'last_update': None, 'events': []}
def should_update_cache(self) -> bool:
#Check if cache needs updating (older than 24 hours)
try:
cached_data = self.load_cache()
if not cached_data['last_update']:
return True
last_update = datetime.fromisoformat(cached_data['last_update'])
time_since_update = datetime.now() - last_update
return time_since_update.total_seconds() > 86400 # 24 hours
except Exception as e:
print(f"Error checking cache: {e}")
return True
"""
def create_demo():
# Initialize the RAG system
rag_system = BrockEventsRAG()
# Custom CSS for better appearance
custom_css = """
.gr-button-primary {
background-color: #8b0000 !important;
border-color: #8b0000 !important;
}
"""
# Create the Gradio interface
with gr.Blocks(css=custom_css) as demo:
gr.Markdown("""
# πŸŽ“ Brock University Events Assistant
Ask me about upcoming events at Brock! I can help you discover:
- Academic workshops
- Student activities
- Campus events
- And more!
""")
chatbot = gr.Chatbot(
label="Chat History",
height=400,
bubble_full_width=False,
)
with gr.Row():
msg = gr.Textbox(
label="Your Question",
placeholder="e.g., What events are happening this week?",
scale=4
)
submit = gr.Button("Ask", scale=1, variant="primary")
with gr.Row():
clear = gr.Button("Clear Chat")
refresh = gr.Button("Refresh Events")
# Event handlers
def respond(message, history):
bot_message = rag_system.generate_response(message, history)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": bot_message})
return "", history
# In the create_demo function:
chatbot = gr.Chatbot(
label="Chat History",
height=400,
bubble_full_width=False,
type="messages" # Use new message format
)
def refresh_events():
rag_system.update_database()
return "Events database has been refreshed!"
submit.click(respond, [msg, chatbot], [msg, chatbot])
msg.submit(respond, [msg, chatbot], [msg, chatbot])
clear.click(lambda: None, None, chatbot)
refresh.click(refresh_events, None, msg)
# Example questions
gr.Examples(
examples=[
"What events are happening this week?",
"Are there any workshops in the library?",
"Tell me about upcoming career events",
"What's happening in the MakerSpace?",
"Any student club meetings soon?",
],
inputs=msg
)
gr.Markdown("""
### Tips:
- Ask about specific dates, locations, or event types
- You can refresh the events database using the button above
- Click on event links to get more details on ExperienceBU
Data is refreshed automatically every 24 hours. Events shown are for the next 14 days.
""")
return demo
if __name__ == "__main__":
demo = create_demo()
demo.launch(
server_name="0.0.0.0", # Required for Spaces
server_port=7860, # Default port
share=False, # Don't create a public link
max_threads=40 # Handle concurrent users
)