Spaces:
Sleeping
Sleeping
| import asyncio | |
| from aiolimiter import AsyncLimiter | |
| from pathlib import Path | |
| import traceback | |
| from typing import Literal, Tuple | |
| from fastapi.routing import APIRouter | |
| import logging | |
| import io | |
| import zipfile | |
| import os | |
| from httpx import AsyncClient | |
| from pydantic import BaseModel | |
| import subprocess | |
| import pandas as pd | |
| import re | |
| import tempfile | |
| from lxml import etree | |
| from bs4 import BeautifulSoup | |
| from fastapi import Depends, HTTPException | |
| from dependencies import get_http_client, get_llm_router | |
| from fastapi.responses import StreamingResponse | |
| from litellm.router import Router | |
| from kreuzberg import ExtractionConfig, extract_bytes | |
| from schemas import DocInfo, GetMeetingDocsRequest, GetMeetingDocsResponse, DocRequirements, DownloadDocsRequest, GetMeetingsRequest, GetMeetingsResponse, ExtractRequirementsRequest, ExtractRequirementsResponse | |
| # API router for requirement extraction from docs / doc list retrieval / download | |
| router = APIRouter(tags=["document extraction"]) | |
| # ==================================================== Utilities ================================================================= | |
| NSMAP = { | |
| 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', | |
| 'v': 'urn:schemas-microsoft-com:vml' | |
| } | |
| # ================================== Converting of files to .txt ==================================== | |
| KREUZBERG_CONFIG: ExtractionConfig = ExtractionConfig( | |
| force_ocr=False, ocr_backend=None) | |
| # Unfortunately needs to be kept to 1, as libreoffice isn't built to support parallel instances | |
| LO_CONVERSION_MUTEX = asyncio.Semaphore(1) | |
| async def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext: str, filter: str = None) -> io.BytesIO: | |
| """ | |
| Converts the given file bytes using Libreoffice headless to the specified file type. | |
| This is an asynchronous version. | |
| Args: | |
| contents: File contents | |
| filename: File base name WITHOUT THE EXTENSION | |
| input_ext: Input extension (WITHOUT THE DOT) | |
| output_ext: Output extension (WITHOUT THE DOT) | |
| filter: The conversion filter to use. | |
| """ | |
| await LO_CONVERSION_MUTEX.acquire() | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| dir_path = Path(tmpdir) | |
| input_file_path = dir_path / f"{filename}.{input_ext}" | |
| output_file_path = dir_path / f"{filename}.{output_ext}" | |
| # write the memory contents to the input file | |
| with open(input_file_path, "wb") as in_file: | |
| in_file.write(contents.read()) | |
| out_bytes = io.BytesIO() | |
| # construct the command | |
| command = [ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", f"{output_ext}:{filter}" if filter else output_ext, | |
| "--outdir", tmpdir, | |
| str(input_file_path) # Ensure path is a string for subprocess | |
| ] | |
| # convert using libreoffice asynchronously | |
| process = await asyncio.create_subprocess_exec( | |
| *command, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| exit_code = await process.wait() | |
| if exit_code != 0 and not output_file_path.exists(): | |
| raise subprocess.CalledProcessError( | |
| exit_code, | |
| command, | |
| output=stdout, | |
| stderr=stderr | |
| ) | |
| LO_CONVERSION_MUTEX.release() | |
| with open(output_file_path, mode="rb") as out: | |
| out_bytes.write(out.read()) | |
| out_bytes.seek(0) | |
| return out_bytes | |
| # Rate limit of FTP downloads per minute | |
| FTP_DOWNLOAD_RATE_LIMITER = AsyncLimiter(max_rate=60, time_period=60) | |
| # Max number of parallel workers downloading | |
| FTP_MAX_PARALLEL_WORKERS = asyncio.Semaphore(4) | |
| async def get_doc_archive(url: str, client: AsyncClient) -> tuple[str, str, io.BytesIO]: | |
| """Récupère le docx depuis l'URL et le retourne un tuple (nom, extension, contenu)""" | |
| async with FTP_DOWNLOAD_RATE_LIMITER: | |
| async with FTP_MAX_PARALLEL_WORKERS: | |
| if not url.endswith("zip"): | |
| raise ValueError("URL doit pointer vers un fichier ZIP") | |
| doc_id = os.path.splitext(os.path.basename(url))[0] | |
| resp = await client.get(url, headers={ | |
| "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| resp.raise_for_status() | |
| with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
| # there should be a single file per file | |
| for entry in zf.infolist(): | |
| if entry.is_dir(): | |
| continue | |
| file_name = entry.filename | |
| root, ext = os.path.splitext(file_name) | |
| doc_bytes = zf.read(file_name) | |
| return (root, ext.lower(), io.BytesIO(doc_bytes)) | |
| raise ValueError("Aucun fichier trouvé dans l'archive") | |
| def apply_docx_revisions(docx_zip: zipfile.ZipFile) -> io.BytesIO: | |
| """ | |
| Applique les révisions des .docx avant de retourner le contenu. | |
| Args: | |
| docx_zip: Le document word sous forme de zip | |
| """ | |
| try: | |
| xml_bytes = docx_zip.read('word/document.xml') | |
| except KeyError: | |
| raise FileNotFoundError( | |
| "word/document.xml not found in the DOCX archive.") | |
| parser = etree.XMLParser(remove_blank_text=True) | |
| root = etree.fromstring(xml_bytes, parser=parser) | |
| # Suppression des balises <w:del> et leur contenu | |
| for del_elem in root.xpath('//w:del', namespaces=NSMAP): | |
| parent = del_elem.getparent() | |
| if parent is not None: | |
| parent.remove(del_elem) | |
| # Désencapsulation des balises <w:ins> | |
| for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): | |
| parent = ins_elem.getparent() | |
| if parent is not None: | |
| index = parent.index(ins_elem) | |
| for child in ins_elem.iterchildren(): | |
| parent.insert(index, child) | |
| index += 1 | |
| parent.remove(ins_elem) | |
| # Nettoyage des commentaires | |
| for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: | |
| for elem in root.xpath(f'//{tag}', namespaces=NSMAP): | |
| parent = elem.getparent() | |
| if parent is not None: | |
| parent.remove(elem) | |
| # 3. Create a new docx with the modified XML | |
| output = io.BytesIO() | |
| with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: | |
| # Copier tous les fichiers non modifiés | |
| for file_info in docx_zip.infolist(): | |
| if file_info.filename != 'word/document.xml': | |
| new_zip.writestr(file_info, docx_zip.read(file_info.filename)) | |
| # Ajouter le document.xml modifié | |
| xml_str = etree.tostring( | |
| root, | |
| xml_declaration=True, | |
| encoding='UTF-8', | |
| pretty_print=True | |
| ) | |
| new_zip.writestr('word/document.xml', xml_str) | |
| output.seek(0) | |
| return output | |
| FORMAT_MIME_TYPES = { | |
| ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ".pdf": "application/pdf", | |
| ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation" | |
| } | |
| async def doc_to_txt(doc_id: str, url: str, client: AsyncClient) -> str: | |
| """ | |
| Télécharge le TDoc spécifié et le convertit en texte. | |
| """ | |
| # Grab the document archive | |
| filename, ext, bytes = await get_doc_archive(url, client) | |
| final_text: str = None | |
| if ext == ".doc": | |
| logging.debug(f"Converting {filename} .doc --> .docx") | |
| docx_bytes = await convert_file(bytes, doc_id, "doc", "docx") | |
| extracted_data = await extract_bytes(docx_bytes.read(), FORMAT_MIME_TYPES[".docx"], config=KREUZBERG_CONFIG) | |
| final_text = extracted_data.content | |
| elif ext == ".docx": | |
| # Applying doc revisions to docx files (especially for pCR / draftCR files) | |
| logging.debug(f"Updating .docx revisions for {doc_id}.") | |
| applied_revision = apply_docx_revisions(zipfile.ZipFile(bytes)) | |
| extracted_data = await extract_bytes(applied_revision.read(), FORMAT_MIME_TYPES[".docx"], config=KREUZBERG_CONFIG) | |
| final_text = extracted_data.content | |
| elif ext == ".ppt": | |
| logging.debug(f"Converting {filename} .ppt --> .pptx") | |
| docx_bytes = await convert_file(bytes, doc_id, "ppt", "pptx") | |
| extracted_data = await extract_bytes(docx_bytes.read(), FORMAT_MIME_TYPES[".pptx"], config=KREUZBERG_CONFIG) | |
| final_text = extracted_data.content | |
| else: | |
| if ext in FORMAT_MIME_TYPES: # file extension is supported | |
| extracted_data = await extract_bytes(bytes.read(), FORMAT_MIME_TYPES[ext], config=KREUZBERG_CONFIG) | |
| final_text = extracted_data.content | |
| else: | |
| raise Exception( | |
| f"Unsupported file type: {ext}, filename: {filename}") | |
| # include an empty line in the beginning | |
| txt_data = ["\n"] + [line.strip() | |
| for line in final_text.splitlines() if line.strip()] | |
| return txt_data | |
| # ============================================= Doc routes ========================================================= | |
| async def get_meetings(req: GetMeetingsRequest, http_client: AsyncClient = Depends(get_http_client)): | |
| """ | |
| Retrieves the list of meetings for the given working group. | |
| """ | |
| # Extracting WG | |
| working_group = req.working_group | |
| tsg = re.sub(r"\d+", "", working_group) | |
| wg_number = re.search(r"\d", working_group).group(0) | |
| # building corresponding FTP url | |
| logging.debug(tsg, wg_number) | |
| url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
| logging.debug(url) | |
| ftp_request = await http_client.get(url) | |
| soup = BeautifulSoup(ftp_request.text, "html.parser") | |
| meeting_folders = [] | |
| all_meetings = [] | |
| wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
| selected_folder = None | |
| # sanity check to ensure the requested workgroup is present in the ftp directories | |
| for folder in wg_folders: | |
| if "wg" + str(wg_number) in folder.lower(): | |
| selected_folder = folder | |
| break | |
| url += "/" + selected_folder | |
| logging.debug(url) | |
| if selected_folder: | |
| resp = await http_client.get(url) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( | |
| ).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] | |
| all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( | |
| "-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] | |
| return GetMeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) | |
| # ============================================================================================================================================ | |
| async def get_meeting_docs(req: GetMeetingDocsRequest, http_client: AsyncClient = Depends(get_http_client)) -> GetMeetingDocsResponse: | |
| """ | |
| Downloads the document list dataframe for a given meeting | |
| """ | |
| # FIXME: extract the document URLS from the hyperlinks in the excelsheet using openpyxl? | |
| # Extracting WG | |
| working_group = req.working_group | |
| tsg = re.sub(r"\d+", "", working_group) | |
| wg_number = re.search(r"\d", working_group).group(0) | |
| url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
| logging.info("Fetching TDocs dataframe") | |
| resp = await http_client.get(url) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
| selected_folder = None | |
| for folder in wg_folders: | |
| if "wg" + str(wg_number) in folder.lower(): | |
| selected_folder = folder | |
| break | |
| url += "/" + selected_folder + "/" + req.meeting + "/docs" | |
| resp = await http_client.get(url) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| files = [item.get_text() for item in soup.select("tr td a") | |
| if item.get_text().endswith(".xlsx")] | |
| if files == []: | |
| raise HTTPException(status_code=404, detail="No XLSX has been found") | |
| df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) | |
| filtered_df = df[~( | |
| df["Uploaded"].isna())][["TDoc", "Title", "CR category", "For", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] | |
| filtered_df["URL"] = filtered_df["TDoc"].apply( | |
| lambda tdoc: f"{url}/{tdoc}.zip") | |
| df = filtered_df.fillna("") | |
| return GetMeetingDocsResponse(data=df[["TDoc", "Title", "Type", "For", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) | |
| # ================================================================================================================================== | |
| async def download_docs(req: DownloadDocsRequest, http_client: AsyncClient = Depends(get_http_client)) -> StreamingResponse: | |
| """Download the specified TDocs and zips them in a single archive""" | |
| # Document IDs to download | |
| document_ids = [doc.document for doc in req.documents] | |
| logging.info(f"Downloading TDocs: {document_ids}") | |
| # quick function for normalizing agenda item names | |
| def __normalize_for_path(text: str) -> str: | |
| if not text: | |
| return "_unspecified_agenda_item" | |
| text = re.sub(r'\s+', '_', text) | |
| text = re.sub(r'[^\w\s-]', '', text).strip() | |
| return text if text else "_unspecified_agenda_item" | |
| async def _process_single_document(item: DocInfo) -> Tuple[bool, bytes]: | |
| """Attempts to convert a document to text and returns success status and content.""" | |
| try: | |
| text_lines = await doc_to_txt(item.document, item.url, http_client) | |
| content_bytes = "\n".join(text_lines).encode("utf-8") | |
| return {"doc_id": item.document, "content": content_bytes, "agenda_item": item.agenda_item} | |
| except Exception as e: | |
| logging.warning( | |
| f"Failed to process document '{item.document}' from URL '{item.url}': {e}") | |
| error_message = f"Document '{item.document}' text extraction failed: {e}".encode( | |
| "utf-8") | |
| return {"doc_id": item.document, "content": error_message, "failed": True, "agenda_item": item.agenda_item} | |
| convert_tasks = await asyncio.gather(*[_process_single_document(doc) for doc in req.documents], return_exceptions=False) | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: | |
| for task in convert_tasks: | |
| # get agenda item directory | |
| agenda_item_str = task.get("agenda_item") or "" | |
| directory_name = __normalize_for_path(agenda_item_str) | |
| failed = "failed" in task | |
| doc_id = task["doc_id"] | |
| base_filename = f"failed_{doc_id}.txt" if failed else f"{doc_id}.txt" | |
| # sort by agenda item if enabled | |
| full_file_path = f"{directory_name}/{base_filename}" if req.sort_by_agenda_item else base_filename | |
| zip_file.writestr(full_file_path, task["content"]) | |
| zip_buffer.seek(0) | |
| return StreamingResponse( | |
| zip_buffer, | |
| media_type="application/zip", | |
| headers={"Content-Disposition": "attachment; filename=tdocs.zip"} | |
| ) | |
| # ====================================================================================================================================================================================== | |
| class ProgressUpdate(BaseModel): | |
| """Defines the structure of a single SSE message.""" | |
| status: Literal["progress", "complete"] | |
| data: dict | |
| total_docs: int | |
| processed_docs: int | |
| async def extract_requirements_from_docs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router), http_client: AsyncClient = Depends(get_http_client)): | |
| """Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations""" | |
| documents = req.documents | |
| n_docs = len(documents) | |
| logging.info( | |
| "Generating requirements for documents: {}".format(req.documents)) | |
| # limit max concurrency of LLM requests to prevent a huge pile of errors because of small rate limits | |
| concurrency_sema = asyncio.Semaphore(4) | |
| def prompt(doc_id, full): | |
| return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" | |
| async def _process_document(doc) -> list[DocRequirements]: | |
| doc_id = doc.document | |
| url = doc.url | |
| # convert the docx to txt for use | |
| try: | |
| doc = await doc_to_txt(doc_id, url, http_client) | |
| full = "\n".join(doc) | |
| except Exception as e: | |
| fmt = "".join(traceback.format_exception(e)) | |
| logging.error(f"Failed to process doc {doc_id} : {fmt}") | |
| return [DocRequirements(document=doc_id, context="Failed to process document", requirements=[])] | |
| try: | |
| await concurrency_sema.acquire() | |
| model_used = "gemini-v2" | |
| resp_ai = await llm_router.acompletion( | |
| model=model_used, | |
| messages=[ | |
| {"role": "user", "content": prompt(doc_id, full)}], | |
| response_format=ExtractRequirementsResponse | |
| ) | |
| return ExtractRequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements | |
| except Exception as e: | |
| return [DocRequirements(document=doc_id, context="Error LLM", requirements=[])] | |
| finally: | |
| concurrency_sema.release() | |
| # futures for all processed documents | |
| process_futures = [_process_document(doc) for doc in documents] | |
| # lambda to print progress | |
| def progress_update(x): return f"data: {x.model_dump_json()}\n\n" | |
| # async generator that generates the SSE events for progress | |
| async def _stream_generator(docs: list[asyncio.Future]): | |
| items = [] | |
| n_processed = 0 | |
| yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=0)) | |
| for doc in asyncio.as_completed(docs): | |
| result = await doc | |
| items.extend(result) | |
| n_processed += 1 | |
| yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=n_processed)) | |
| final_response = ExtractRequirementsResponse(requirements=items) | |
| yield progress_update(ProgressUpdate(status="complete", data=final_response.model_dump(), total_docs=n_docs, processed_docs=n_processed)) | |
| return StreamingResponse(_stream_generator(process_futures), media_type="text/event-stream") | |