Spaces:
Sleeping
Sleeping
| """ | |
| Research Assistant Component | |
| Main research assistant logic and workflow management | |
| """ | |
| import os | |
| import json | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| import logging | |
| from .config import Config | |
| from .groq_processor import GroqProcessor | |
| from .rag_system import RAGSystem | |
| from .unified_fetcher import PaperFetcher | |
| from .pdf_processor import PDFProcessor | |
| from .trend_monitor import AdvancedTrendMonitor | |
| class ProjectManager: | |
| """Manages research projects""" | |
| def __init__(self, config: Config = None): | |
| self.config = config or Config() | |
| self.projects = {} | |
| self.project_counter = 0 | |
| self.projects_file = os.path.join(self.config.BASE_DIR, 'projects.json') | |
| self.load_projects() | |
| def load_projects(self): | |
| """Load projects from storage""" | |
| try: | |
| if os.path.exists(self.projects_file): | |
| with open(self.projects_file, 'r') as f: | |
| data = json.load(f) | |
| self.projects = data.get('projects', {}) | |
| self.project_counter = data.get('counter', 0) | |
| print(f"Loaded {len(self.projects)} projects") | |
| except Exception as e: | |
| print(f"Error loading projects: {e}") | |
| def save_projects(self): | |
| """Save projects to storage""" | |
| try: | |
| os.makedirs(os.path.dirname(self.projects_file), exist_ok=True) | |
| with open(self.projects_file, 'w') as f: | |
| json.dump({ | |
| 'projects': self.projects, | |
| 'counter': self.project_counter | |
| }, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving projects: {e}") | |
| def create_project(self, name: str, research_question: str, keywords: List[str], user_id: str) -> str: | |
| """Create a new research project""" | |
| self.project_counter += 1 | |
| project_id = f"project_{self.project_counter}" | |
| self.projects[project_id] = { | |
| 'id': project_id, | |
| 'name': name, | |
| 'research_question': research_question, | |
| 'keywords': keywords, | |
| 'papers': [], | |
| 'notes': [], | |
| 'status': 'active', | |
| 'user_id': user_id, # Track which user created this project | |
| 'created_at': datetime.now().isoformat(), | |
| 'updated_at': datetime.now().isoformat() | |
| } | |
| self.save_projects() | |
| return project_id | |
| def get_project(self, project_id: str, user_id: str = None) -> Optional[Dict[str, Any]]: | |
| """Get a project by ID, optionally checking user ownership""" | |
| project = self.projects.get(project_id) | |
| if project and user_id: | |
| # Check if user owns this project | |
| if project.get('user_id') != user_id: | |
| return None | |
| return project | |
| def update_project(self, project_id: str, user_id: str = None, **kwargs): | |
| """Update a project""" | |
| if project_id in self.projects: | |
| # Check user ownership if user_id provided | |
| if user_id and self.projects[project_id].get('user_id') != user_id: | |
| return False | |
| self.projects[project_id].update(kwargs) | |
| self.projects[project_id]['updated_at'] = datetime.now().isoformat() | |
| self.save_projects() | |
| return True | |
| return False | |
| def add_paper_to_project(self, project_id: str, paper: Dict[str, Any], user_id: str = None): | |
| """Add a paper to a project""" | |
| if project_id in self.projects: | |
| # Check user ownership if user_id provided | |
| if user_id and self.projects[project_id].get('user_id') != user_id: | |
| return False | |
| self.projects[project_id]['papers'].append(paper) | |
| self.update_project(project_id, user_id=user_id) | |
| return True | |
| return False | |
| def list_projects(self, user_id: str = None) -> List[Dict[str, Any]]: | |
| """List projects, optionally filtered by user ID""" | |
| if user_id: | |
| # Return only projects owned by this user | |
| return [project for project in self.projects.values() | |
| if project.get('user_id') == user_id] | |
| else: | |
| # Return all projects (for admin use) | |
| return list(self.projects.values()) | |
| class SimpleResearchAssistant: | |
| """ | |
| Simplified research assistant that combines all components | |
| """ | |
| def __init__(self, config: Config = None): | |
| self.config = config or Config() | |
| # Initialize components | |
| print("Initializing Research Assistant...") | |
| self.groq_processor = GroqProcessor(self.config) | |
| self.rag_system = RAGSystem(self.config) | |
| self.paper_fetcher = PaperFetcher(self.config) | |
| self.pdf_processor = PDFProcessor(self.config) | |
| self.project_manager = ProjectManager(self.config) | |
| self.trend_monitor = AdvancedTrendMonitor(self.groq_processor) | |
| print("Research Assistant initialized!") | |
| # Set up logging | |
| logging.basicConfig(level=getattr(logging, self.config.LOG_LEVEL)) | |
| self.logger = logging.getLogger(__name__) | |
| def search_papers(self, query: str, max_results: int = 10, sources: List[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Search for papers across multiple sources | |
| Args: | |
| query: Search query | |
| max_results: Maximum number of results | |
| sources: List of sources to search ['arxiv', 'semantic_scholar', 'crossref', 'pubmed'] | |
| Returns: | |
| List of papers | |
| """ | |
| # Use all sources by default for comprehensive search | |
| if sources is None: | |
| sources = ['arxiv', 'semantic_scholar', 'crossref', 'pubmed'] | |
| self.logger.info(f"Searching for: {query}") | |
| print(f"DEBUG: Starting multi-source search for '{query}' with max_results={max_results}") | |
| print(f"DEBUG: Using sources: {sources}") | |
| try: | |
| # Use the unified fetcher for all sources | |
| papers = self.paper_fetcher.search_papers(query, max_results, sources=sources) | |
| print(f"DEBUG: Unified fetcher returned {len(papers)} papers") | |
| # Add to RAG system for future querying | |
| if papers: | |
| try: | |
| self.rag_system.add_papers(papers) | |
| print("DEBUG: Papers added to RAG system") | |
| except Exception as e: | |
| print(f"DEBUG: Failed to add papers to RAG system: {e}") | |
| self.logger.info(f"Found {len(papers)} papers from {len(sources)} sources") | |
| print(f"DEBUG: Returning {len(papers)} papers from multi-source search") | |
| return papers | |
| except Exception as e: | |
| print(f"DEBUG: Multi-source search failed: {e}") | |
| self.logger.error(f"Multi-source search failed: {e}") | |
| return [] | |
| def ask_question(self, question: str, context: str = None) -> Dict[str, Any]: | |
| """ | |
| Answer a research question using RAG | |
| Args: | |
| question: Research question | |
| context: Optional context | |
| Returns: | |
| Answer with sources | |
| """ | |
| self.logger.info(f"Answering question: {question}") | |
| # Use RAG system if available | |
| if self.rag_system.vectorstore: | |
| return self.rag_system.answer_question(question) | |
| else: | |
| # Fallback to direct LLM | |
| answer = self.groq_processor.answer_question(question, context or "") | |
| return { | |
| 'answer': answer, | |
| 'sources': [], | |
| 'method': 'direct_llm' | |
| } | |
| def process_pdf(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Process a PDF file | |
| Args: | |
| file_path: Path to PDF file | |
| Returns: | |
| Processing result | |
| """ | |
| self.logger.info(f"Processing PDF: {file_path}") | |
| # Extract text | |
| extraction_result = self.pdf_processor.extract_text_from_file(file_path) | |
| if extraction_result.get('error'): | |
| return {'success': False, 'error': extraction_result['error']} | |
| text = extraction_result.get('text', '') | |
| # Extract basic information | |
| title = self._extract_title_from_text(text) | |
| abstract = self._extract_abstract_from_text(text) | |
| # Generate summary using Groq | |
| summary = self.groq_processor.summarize_paper(title, abstract, text) | |
| # Create paper object | |
| paper = { | |
| 'title': title, | |
| 'abstract': abstract, | |
| 'content': text, | |
| 'summary': summary, | |
| 'source': 'uploaded_pdf', | |
| 'file_path': file_path, | |
| 'processed_at': datetime.now().isoformat(), | |
| 'metadata': extraction_result.get('metadata', {}) | |
| } | |
| # Try to add to RAG system (don't fail if RAG is not initialized) | |
| try: | |
| self.rag_system.add_papers([paper]) | |
| except Exception as e: | |
| self.logger.warning(f"Could not add paper to RAG system: {e}") | |
| # Return formatted response with all expected fields | |
| return { | |
| 'success': True, | |
| 'title': title, | |
| 'abstract': abstract, | |
| 'text_length': len(text), | |
| 'processed_at': datetime.now().isoformat(), | |
| 'summary': summary, | |
| 'paper': paper, | |
| 'word_count': extraction_result.get('word_count', 0), | |
| 'pages': extraction_result.get('metadata', {}).get('pages', 0) | |
| } | |
| def analyze_trends(self, topic: str, max_papers: int = 50) -> Dict[str, Any]: | |
| """ | |
| Analyze research trends for a topic using advanced trend monitoring | |
| Args: | |
| topic: Research topic | |
| max_papers: Maximum papers to analyze | |
| Returns: | |
| Advanced trend analysis | |
| """ | |
| self.logger.info(f"Analyzing trends for: {topic}") | |
| print(f"📊 Starting advanced trend analysis for '{topic}'") | |
| # Get papers from multiple sources for comprehensive analysis | |
| papers = self.search_papers(topic, max_papers) | |
| if not papers: | |
| return {'error': 'No papers found for trend analysis'} | |
| print(f"📊 Found {len(papers)} papers for trend analysis") | |
| # Use advanced trend monitor for comprehensive analysis | |
| trend_report = self.trend_monitor.generate_trend_report(papers) | |
| # Add metadata | |
| trend_report['query_metadata'] = { | |
| 'topic': topic, | |
| 'papers_analyzed': len(papers), | |
| 'analysis_date': datetime.now().isoformat(), | |
| 'analysis_type': 'advanced_trend_monitoring' | |
| } | |
| return trend_report | |
| def create_project(self, name: str, research_question: str, keywords: List[str], user_id: str) -> str: | |
| """Create a new research project""" | |
| return self.project_manager.create_project(name, research_question, keywords, user_id) | |
| def get_project(self, project_id: str, user_id: str = None) -> Optional[Dict[str, Any]]: | |
| """Get a project by ID""" | |
| return self.project_manager.get_project(project_id, user_id) | |
| def list_projects(self, user_id: str = None) -> List[Dict[str, Any]]: | |
| """List projects""" | |
| return self.project_manager.list_projects(user_id) | |
| def conduct_literature_search(self, project_id: str, max_papers: int = 20, user_id: str = None) -> Dict[str, Any]: | |
| """ | |
| Conduct literature search for a project | |
| Args: | |
| project_id: Project ID | |
| max_papers: Maximum papers to find | |
| user_id: User ID to check ownership | |
| Returns: | |
| Search results | |
| """ | |
| project = self.project_manager.get_project(project_id, user_id) | |
| if not project: | |
| return {'error': 'Project not found or access denied'} | |
| # Build search query | |
| query = f"{project['research_question']} {' '.join(project['keywords'])}" | |
| # Search for papers | |
| papers = self.search_papers(query, max_papers) | |
| # Add papers to project | |
| for paper in papers: | |
| self.project_manager.add_paper_to_project(project_id, paper, user_id) | |
| return { | |
| 'project_id': project_id, | |
| 'papers_found': len(papers), | |
| 'papers': papers | |
| } | |
| def generate_literature_review(self, project_id: str, user_id: str = None) -> Dict[str, Any]: | |
| """ | |
| Generate a literature review for a project | |
| Args: | |
| project_id: Project ID | |
| user_id: User ID to check ownership | |
| Returns: | |
| Literature review | |
| """ | |
| try: | |
| project = self.project_manager.get_project(project_id, user_id) | |
| if not project: | |
| return {'error': 'Project not found or access denied'} | |
| papers = project.get('papers', []) | |
| if not papers: | |
| return {'error': 'No papers found in project'} | |
| print(f"Generating review for project {project_id} with {len(papers)} papers...") | |
| # Generate review | |
| review_content = self.groq_processor.generate_literature_review( | |
| papers, | |
| project['research_question'] | |
| ) | |
| print(f"Review generated, length: {len(review_content) if review_content else 0}") | |
| if not review_content or review_content.startswith("Error"): | |
| return {'error': f'Failed to generate review: {review_content}'} | |
| return { | |
| 'project_id': project_id, | |
| 'review': { | |
| 'content': review_content, | |
| 'papers_count': len(papers), | |
| 'research_question': project['research_question'] | |
| }, | |
| 'papers_reviewed': len(papers), | |
| 'generated_at': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| print(f"Error in generate_literature_review: {str(e)}") | |
| return {'error': f'Unexpected error: {str(e)}'} | |
| def get_system_status(self) -> Dict[str, Any]: | |
| """Get system status""" | |
| return { | |
| 'status': 'operational', | |
| 'components': { | |
| 'groq_processor': 'ready', | |
| 'rag_system': 'ready', | |
| 'arxiv_fetcher': 'ready', | |
| 'pdf_processor': 'ready', | |
| 'project_manager': 'ready' | |
| }, | |
| 'statistics': { | |
| 'rag_documents': self.rag_system.get_database_stats().get('total_chunks', 0), | |
| 'system_version': '2.0.0', | |
| 'status_check_time': datetime.now().isoformat() | |
| }, | |
| 'config': self.config.get_summary() | |
| } | |
| def _extract_title_from_text(self, text: str) -> str: | |
| """Extract title from PDF text""" | |
| lines = text.split('\n')[:20] # Check first 20 lines | |
| for line in lines: | |
| line = line.strip() | |
| if len(line) > 10 and len(line) < 200: | |
| # Skip lines that look like headers or metadata | |
| if not any(keyword in line.lower() for keyword in ['page', 'arxiv', 'doi', 'submitted', 'accepted']): | |
| return line | |
| return "Unknown Title" | |
| def _extract_abstract_from_text(self, text: str) -> str: | |
| """Extract abstract from PDF text""" | |
| text_lower = text.lower() | |
| # Look for abstract section | |
| abstract_start = text_lower.find('abstract') | |
| if abstract_start != -1: | |
| # Find the end of abstract (usually next section) | |
| abstract_text = text[abstract_start:] | |
| # Look for common section headers that might follow abstract | |
| section_headers = ['introduction', '1. introduction', '1 introduction', 'keywords', 'key words'] | |
| end_pos = len(abstract_text) | |
| for header in section_headers: | |
| pos = abstract_text.lower().find(header) | |
| if pos != -1 and pos < end_pos: | |
| end_pos = pos | |
| abstract = abstract_text[:end_pos] | |
| # Clean up | |
| abstract = abstract.replace('abstract', '', 1).strip() | |
| if len(abstract) > 1000: | |
| abstract = abstract[:1000] + "..." | |
| return abstract | |
| return "Abstract not found" | |
| class ResearchMate: | |
| """ | |
| Main ResearchMate interface | |
| Simplified wrapper around the research assistant | |
| """ | |
| def __init__(self, config: Config = None): | |
| self.config = config or Config() | |
| self.assistant = SimpleResearchAssistant(self.config) | |
| self.version = "2.0.0" | |
| self.initialized_at = datetime.now().isoformat() | |
| print(f"ResearchMate {self.version} initialized!") | |
| def search(self, query: str, max_results: int = 10) -> Dict[str, Any]: | |
| """Search for papers""" | |
| try: | |
| papers = self.assistant.search_papers(query, max_results) | |
| return { | |
| 'success': True, | |
| 'query': query, | |
| 'papers': papers, | |
| 'count': len(papers) | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def ask(self, question: str) -> Dict[str, Any]: | |
| """Ask a research question""" | |
| try: | |
| result = self.assistant.ask_question(question) | |
| return { | |
| 'success': True, | |
| 'question': question, | |
| 'answer': result['answer'], | |
| 'sources': result.get('sources', []) | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def upload_pdf(self, file_path: str) -> Dict[str, Any]: | |
| """Process uploaded PDF""" | |
| try: | |
| result = self.assistant.process_pdf(file_path) | |
| return result | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def analyze_trends(self, topic: str) -> Dict[str, Any]: | |
| """Analyze research trends""" | |
| try: | |
| result = self.assistant.analyze_trends(topic) | |
| return {'success': True, **result} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def create_project(self, name: str, research_question: str, keywords: List[str], user_id: str) -> Dict[str, Any]: | |
| """Create research project""" | |
| try: | |
| project_id = self.assistant.create_project(name, research_question, keywords, user_id) | |
| return { | |
| 'success': True, | |
| 'project_id': project_id, | |
| 'message': f'Project "{name}" created successfully' | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def get_project(self, project_id: str, user_id: str = None) -> Dict[str, Any]: | |
| """Get project details""" | |
| try: | |
| project = self.assistant.get_project(project_id, user_id) | |
| if project: | |
| return {'success': True, 'project': project} | |
| else: | |
| return {'success': False, 'error': 'Project not found or access denied'} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def list_projects(self, user_id: str = None) -> Dict[str, Any]: | |
| """List projects""" | |
| try: | |
| projects = self.assistant.list_projects(user_id) | |
| return {'success': True, 'projects': projects} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def search_project_literature(self, project_id: str, max_papers: int = 20, user_id: str = None) -> Dict[str, Any]: | |
| """Search literature for a project""" | |
| try: | |
| result = self.assistant.conduct_literature_search(project_id, max_papers, user_id) | |
| return {'success': True, **result} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def generate_review(self, project_id: str, user_id: str = None) -> Dict[str, Any]: | |
| """Generate literature review for a project""" | |
| try: | |
| result = self.assistant.generate_literature_review(project_id, user_id) | |
| return {'success': True, **result} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get system status""" | |
| try: | |
| status = self.assistant.get_system_status() | |
| return {'success': True, **status} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def analyze_project(self, project_id: str, user_id: str = None) -> Dict[str, Any]: | |
| """Analyze project literature""" | |
| try: | |
| project = self.assistant.get_project(project_id, user_id) | |
| if not project: | |
| return {'success': False, 'error': 'Project not found or access denied'} | |
| # Basic project analysis | |
| papers = project.get('papers', []) | |
| if not papers: | |
| return {'success': False, 'error': 'No papers found in project'} | |
| # Helper function to safely extract year | |
| def safe_year(paper): | |
| year = paper.get('year') | |
| if year is None: | |
| return None | |
| try: | |
| if isinstance(year, str): | |
| year = int(year) | |
| if isinstance(year, int) and 1900 <= year <= 2030: | |
| return year | |
| except (ValueError, TypeError): | |
| pass | |
| return None | |
| # Analyze papers | |
| total_papers = len(papers) | |
| # Process years more safely | |
| years = [safe_year(p) for p in papers] | |
| years = [y for y in years if y is not None] | |
| authors = [] | |
| for p in papers: | |
| if p.get('authors'): | |
| if isinstance(p.get('authors'), list): | |
| authors.extend(p.get('authors')) | |
| elif isinstance(p.get('authors'), str): | |
| authors.append(p.get('authors')) | |
| # Extract key topics from keywords and titles | |
| all_keywords = [] | |
| for p in papers: | |
| if p.get('keywords'): | |
| if isinstance(p.get('keywords'), list): | |
| all_keywords.extend(p.get('keywords')) | |
| elif isinstance(p.get('keywords'), str): | |
| all_keywords.extend(p.get('keywords').split(',')) | |
| # Calculate year range safely | |
| year_range = "Unknown" | |
| if years: | |
| min_year = min(years) | |
| max_year = max(years) | |
| year_range = f"{min_year} - {max_year}" if min_year != max_year else str(min_year) | |
| # Count recent papers safely | |
| recent_papers_count = len([p for p in papers if safe_year(p) is not None and safe_year(p) >= 2020]) | |
| # Basic analysis | |
| analysis = { | |
| 'total_papers': total_papers, | |
| 'year_range': year_range, | |
| 'unique_authors': len(set(authors)) if authors else 0, | |
| 'top_authors': list(set(authors))[:10] if authors else [], | |
| 'key_topics': list(set([k.strip().lower() for k in all_keywords if k.strip()]))[:10] if all_keywords else [], | |
| 'recent_papers': [p for p in papers if safe_year(p) is not None and safe_year(p) >= 2020][:5], | |
| 'trends': f"Based on {total_papers} papers" + (f" spanning {year_range}" if years else ""), | |
| 'insights': f"""## Key Research Insights | |
| **Total Literature:** {total_papers} papers analyzed | |
| **Research Scope:** {"Multi-year analysis spanning " + str(len(set(years))) + " different years" if len(years) > 1 else "Limited temporal scope"} | |
| **Author Collaboration:** {len(set(authors))} unique researchers identified | |
| **Key Themes:** {', '.join(list(set([k.strip().title() for k in all_keywords if k.strip()]))[:5]) if all_keywords else 'No specific themes identified'} | |
| **Research Activity:** {"Active research area" if total_papers > 10 else "Emerging research area"} | |
| """, | |
| 'summary': f"""## Literature Analysis Summary | |
| This project contains **{total_papers} research papers**{f" published between {year_range}" if years else ""}. | |
| **Research Community:** The work involves {len(set(authors))} unique authors{f", with top contributors including {', '.join(list(set(authors))[:3])}" if len(authors) >= 3 else ""}. | |
| **Research Focus:** {"The literature covers diverse topics including " + ', '.join(list(set([k.strip().title() for k in all_keywords if k.strip()]))[:5]) if all_keywords else "The research focus requires further analysis based on paper content"}. | |
| **Temporal Distribution:** {"Recent research activity is strong" if recent_papers_count > total_papers * 0.5 else "Includes both historical and recent contributions"}. | |
| **Research Maturity:** {"Well-established research area" if total_papers > 20 else "Growing research area"} with {"strong" if len(set(authors)) > 15 else "moderate"} community engagement. | |
| """ | |
| } | |
| return { | |
| 'success': True, | |
| 'project_id': project_id, | |
| 'analysis': analysis, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def ask_project_question(self, project_id: str, question: str) -> Dict[str, Any]: | |
| """Ask a question about a specific project""" | |
| try: | |
| project = self.assistant.get_project(project_id) | |
| if not project: | |
| return {'success': False, 'error': 'Project not found'} | |
| # Context-aware question answering | |
| context = f"Project: {project.get('name', '')}\n" | |
| context += f"Research Question: {project.get('research_question', '')}\n" | |
| context += f"Keywords: {', '.join(project.get('keywords', []))}\n" | |
| # Use RAG system with project context | |
| full_question = f"Context: {context}\n\nQuestion: {question}" | |
| result = self.assistant.ask_question(full_question) | |
| return { | |
| 'success': True, | |
| 'project_id': project_id, | |
| 'question': question, | |
| 'answer': result['answer'], | |
| 'sources': result.get('sources', []) | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def trend_monitor(self): | |
| """Access to the advanced trend monitor""" | |
| return self.assistant.trend_monitor | |
| def search_papers(self, query: str, max_results: int = 10): | |
| """Direct access to paper search""" | |
| return self.assistant.search_papers(query, max_results) | |