Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import time | |
| import requests | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| import tempfile | |
| from io import BytesIO | |
| # Imports pour les outils | |
| from ddgs import DDGS | |
| from bs4 import BeautifulSoup | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from fake_useragent import UserAgent | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
| # Imports pour la lecture de fichiers | |
| import PyPDF2 | |
| from docx import Document | |
| import openpyxl | |
| import pandas as pd | |
| import markdown | |
| import magic | |
| from PIL import Image | |
| import numpy as np | |
| from smolagents import tool | |
| class AdvancedWebSearchTool: | |
| """Outil de recherche web avancé avec DuckDuckGo""" | |
| def __init__(self): | |
| self.ddgs = DDGS() | |
| self.ua = UserAgent() | |
| def search_web(self, query: str, max_results: int = 10, region: str = "en-en") -> List[Dict[str, str]]: | |
| """ | |
| Recherche avancée sur le web avec DuckDuckGo | |
| Args: | |
| query: Requête de recherche | |
| max_results: Nombre maximum de résultats (défaut: 10) | |
| region: Région de recherche (défaut: en-en) | |
| Returns: | |
| Liste de dictionnaires avec title, url, snippet | |
| """ | |
| try: | |
| print(f"🔍 Recherche: '{query}' (max: {max_results})") | |
| results = [] | |
| search_results = self.ddgs.text( | |
| query=query, | |
| region=region, | |
| max_results=max_results, | |
| timelimit="m" # Résultats récents | |
| ) | |
| for result in search_results: | |
| results.append({ | |
| "title": result.get("title", ""), | |
| "url": result.get("href", ""), | |
| "snippet": result.get("body", "") | |
| }) | |
| print(f"✅ Trouvé {len(results)} résultats") | |
| return results | |
| except Exception as e: | |
| print(f"❌ Erreur lors de la recherche: {e}") | |
| return [{"error": str(e)}] | |
| class AdvancedWebScrapingTool: | |
| """Outil de scraping web simplifié SANS Selenium.""" | |
| def __init__(self): | |
| self.ua = UserAgent() | |
| # On supprime toute référence au driver Selenium | |
| def scrape_website(self, url: str, extract_text: bool = True, extract_links: bool = False) -> Dict[str, Any]: | |
| """Scrape un site web en utilisant uniquement requests et BeautifulSoup.""" | |
| try: | |
| print(f"🌐 Scraping (requests uniquement): {url}") | |
| headers = {'User-Agent': self.ua.random} | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| print(f"❌ Erreur scraping avec requests: {e}") | |
| return {"error": str(e), "url": url} | |
| class FileReaderTool: | |
| """Outil de lecture de fichiers multi-format""" | |
| def read_file(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Lit un fichier et extrait son contenu selon le format | |
| Args: | |
| file_path: Chemin vers le fichier | |
| Returns: | |
| Dictionnaire avec le contenu du fichier | |
| """ | |
| try: | |
| print(f"📄 Lecture du fichier: {file_path}") | |
| if not os.path.exists(file_path): | |
| return {"error": f"Fichier non trouvé: {file_path}"} | |
| # Détection du type de fichier | |
| file_ext = Path(file_path).suffix.lower() | |
| result = { | |
| "file_path": file_path, | |
| "file_type": file_ext, | |
| "content": "", | |
| "metadata": {}, | |
| "status": "success" | |
| } | |
| if file_ext == '.pdf': | |
| result.update(self._read_pdf(file_path)) | |
| # Word Documents | |
| elif file_ext in ['.docx', '.doc']: | |
| result.update(self._read_docx(file_path)) | |
| # Excel | |
| elif file_ext in ['.xlsx', '.xls']: | |
| result.update(self._read_excel(file_path)) | |
| # CSV | |
| elif file_ext == '.csv': | |
| result.update(self._read_csv(file_path)) | |
| # JSON | |
| elif file_ext == '.json': | |
| result.update(self._read_json(file_path)) | |
| # Markdown | |
| elif file_ext in ['.md', '.markdown']: | |
| result.update(self._read_markdown(file_path)) | |
| # Texte simple | |
| elif file_ext in ['.txt', '.log', '.py', '.js', '.html', '.css']: | |
| result.update(self._read_text(file_path)) | |
| # Images | |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| result.update(self._read_image(file_path)) | |
| else: | |
| # Tentative de lecture comme texte | |
| result.update(self._read_text(file_path)) | |
| print(f"✅ Fichier lu avec succès: {len(str(result['content']))} caractères") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Erreur lecture fichier: {e}") | |
| return {"error": str(e), "file_path": file_path} | |
| def _read_pdf(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier PDF""" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return { | |
| "content": text.strip(), | |
| "metadata": { | |
| "pages": len(pdf_reader.pages), | |
| "title": pdf_reader.metadata.get('/Title', '') if pdf_reader.metadata else '' | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur PDF: {e}"} | |
| def _read_docx(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier Word""" | |
| try: | |
| doc = Document(file_path) | |
| text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) | |
| return { | |
| "content": text, | |
| "metadata": { | |
| "paragraphs": len(doc.paragraphs), | |
| "core_properties": str(doc.core_properties.title) if doc.core_properties.title else "" | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur DOCX: {e}"} | |
| def _read_excel(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier Excel""" | |
| try: | |
| df = pd.read_excel(file_path, sheet_name=None) | |
| content = {} | |
| for sheet_name, sheet_df in df.items(): | |
| content[sheet_name] = { | |
| "data": sheet_df.to_dict('records')[:100], # Limite à 100 lignes | |
| "shape": sheet_df.shape, | |
| "columns": list(sheet_df.columns) | |
| } | |
| return { | |
| "content": content, | |
| "metadata": { | |
| "sheets": list(df.keys()), | |
| "total_sheets": len(df) | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur Excel: {e}"} | |
| def _read_csv(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier CSV""" | |
| try: | |
| df = pd.read_csv(file_path) | |
| return { | |
| "content": { | |
| "data": df.head(100).to_dict('records'), # Premières 100 lignes | |
| "shape": df.shape, | |
| "columns": list(df.columns), | |
| "dtypes": df.dtypes.to_dict() | |
| }, | |
| "metadata": { | |
| "rows": len(df), | |
| "columns": len(df.columns) | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur CSV: {e}"} | |
| def _read_json(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier JSON""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| data = json.load(file) | |
| return { | |
| "content": data, | |
| "metadata": { | |
| "type": type(data).__name__, | |
| "size": len(str(data)) | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur JSON: {e}"} | |
| def _read_markdown(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier Markdown""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| html = markdown.markdown(content) | |
| return { | |
| "content": content, | |
| "metadata": { | |
| "html_version": html, | |
| "lines": len(content.split('\n')) | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur Markdown: {e}"} | |
| def _read_text(self, file_path: str) -> Dict[str, Any]: | |
| """Lit un fichier texte""" | |
| try: | |
| # Détection de l'encodage | |
| encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as file: | |
| content = file.read() | |
| return { | |
| "content": content, | |
| "metadata": { | |
| "encoding": encoding, | |
| "lines": len(content.split('\n')), | |
| "characters": len(content) | |
| } | |
| } | |
| except UnicodeDecodeError: | |
| continue | |
| return {"error": "Impossible de décoder le fichier"} | |
| except Exception as e: | |
| return {"error": f"Erreur texte: {e}"} | |
| def _read_image(self, file_path: str) -> Dict[str, Any]: | |
| """Lit une image et extrait les métadonnées""" | |
| try: | |
| with Image.open(file_path) as img: | |
| return { | |
| "content": f"Image {img.format} - Taille: {img.size[0]}x{img.size[1]}", | |
| "metadata": { | |
| "format": img.format, | |
| "size": img.size, | |
| "mode": img.mode, | |
| "path": file_path | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Erreur image: {e}"} | |
| # Initialisation et Export des Outils | |
| # On instancie les classes qui contiennent la logique des outils | |
| _web_search_tool_instance = AdvancedWebSearchTool() | |
| _web_scraping_tool_instance = AdvancedWebScrapingTool() | |
| _file_reader_tool_instance = FileReaderTool() | |
| # On définit des fonctions "wrapper" autonomes et on les décore avec @tool | |
| def search_web(query: str, max_results: int = 10, region: str = "fr-fr") -> List[Dict[str, str]]: | |
| """ | |
| Recherche avancée sur le web avec DuckDuckGo pour trouver des informations à jour. | |
| Args: | |
| query: Requête de recherche. | |
| max_results: Nombre maximum de résultats à retourner. | |
| region: Région de recherche (ex: 'fr-fr', 'wt-wt'). | |
| Returns: | |
| Liste de résultats de recherche, chacun contenant un titre, une URL et un snippet. | |
| """ | |
| return _web_search_tool_instance.search_web(query, max_results=max_results, region=region) | |
| def scrape_website(url: str, extract_text: bool = True, extract_links: bool = False) -> Dict[str, Any]: | |
| """ | |
| Scrape une page web pour en extraire le contenu textuel principal, le titre et les liens. | |
| Args: | |
| url: URL de la page web à scraper. | |
| extract_text: Si True, extrait le texte principal de la page. | |
| extract_links: Si True, extrait les liens hypertextes de la page. | |
| Returns: | |
| Un dictionnaire contenant le contenu extrait du site. | |
| """ | |
| return _web_scraping_tool_instance.scrape_website(url, extract_text=extract_text, extract_links=extract_links) | |
| def read_file(file_path: str) -> Dict[str, Any]: | |
| """ | |
| Lit un fichier local et en extrait le contenu. Gère divers formats (PDF, DOCX, XLSX, CSV, JSON, TXT, etc.). | |
| Args: | |
| file_path: Chemin d'accès local au fichier à lire. | |
| Returns: | |
| Un dictionnaire contenant le contenu du fichier et ses métadonnées. | |
| """ | |
| print(f"📄 Tentative de lecture du fichier: {file_path}") | |
| # On vérifie si le fichier existe AVANT de continuer. | |
| if not os.path.exists(file_path): | |
| # Message d'erreur très clair pour l'agent | |
| error_message = ( | |
| f"ERROR: The file '{file_path}' was not found. " | |
| "I cannot access local files. I must find the information using other tools like search_web." | |
| ) | |
| print(f"❌ {error_message}") | |
| # On retourne le dictionnaire d'erreur et on arrête la fonction ici. | |
| return {"error": error_message} | |
| # Si le fichier existe, alors on appelle la logique de lecture. | |
| return _file_reader_tool_instance.read_file(file_path) | |
| def get_youtube_transcript(video_url: str) -> Dict[str, Any]: | |
| """ | |
| Récupère la transcription d'une vidéo YouTube à partir de son URL. | |
| Args: | |
| video_url (str): L'URL de la vidéo. | |
| Returns: | |
| Un dictionnaire avec la transcription ou une erreur. | |
| """ | |
| try: | |
| # ... (la logique d'extraction de video_id reste la même) ... | |
| video_id = None | |
| if "v=" in video_url: | |
| video_id = video_url.split("v=")[1].split('&')[0] | |
| elif video_url.startswith("http://googleusercontent.com/youtube.com/"): | |
| video_id = video_url.split('/')[-1] | |
| if not video_id: | |
| return {"error": "ID de vidéo non trouvé."} | |
| # On instancie la classe avant d'appeler la méthode. | |
| # Cela peut contourner les problèmes de chargement de méthodes statiques. | |
| api = YouTubeTranscriptApi() | |
| transcript_list = api.get_transcript(video_id, languages=['en', 'fr']) | |
| transcript_text = " ".join([item['text'] for item in transcript_list]) | |
| return {"status": "success", "transcript": transcript_text[:15000]} | |
| except Exception as e: | |
| return {"error": str(e)} |