Spaces:
Sleeping
Sleeping
| from fastapi.staticfiles import StaticFiles | |
| import requests, re, warnings | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from bs4 import BeautifulSoup | |
| from huggingface_hub import configure_http_backend | |
| from schemas import * | |
| from classes import * | |
| def backend_factory() -> requests.Session: | |
| session = requests.Session() | |
| session.verify = False | |
| return session | |
| configure_http_backend(backend_factory=backend_factory) | |
| warnings.filterwarnings("ignore") | |
| load_dotenv() | |
| meetings_mapping = { | |
| "SA": [ | |
| "TSG_SA", | |
| "WG1_Serv", | |
| "WG2_Arch", | |
| "WG3_Security", | |
| "WG4_CODEC", | |
| "WG5_TM", | |
| "WG6_MissionCritical" | |
| ], | |
| "CT": [ | |
| "TSG_CT", | |
| "WG1_mm-cc-sm_ex-CN1", | |
| "WG2_capability_ex-T2", | |
| "WG3_interworking_ex-CN3", | |
| "WG4_protocollars_ex-CN4", | |
| "WG5_osa_ex-CN5", | |
| "WG6_Smartcard_Ex-T3" | |
| ], | |
| "RAN": [ | |
| "TSG_RAN", | |
| "WG1_RL1", | |
| "WG2_RL2", | |
| "WG3_Iu", | |
| "WG4_Radio", | |
| "WG5_Test_ex-T1", | |
| "WG6_legacyRAN" | |
| ] | |
| } | |
| tdoc_indexer = TDocIndexer() | |
| spec_3gpp_indexer = Spec3GPPIndexer() | |
| spec_etsi_indexer = SpecETSIIndexer() | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_origins=["*"]) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| def main(): | |
| return FileResponse("index.html") | |
| def get_folder_name(working_group: str): | |
| if working_group.endswith("P"): | |
| if working_group.startswith("S"): | |
| return ("SA", 0) | |
| if working_group.startswith("C"): | |
| return ("CT", 0) | |
| if working_group.startswith("R"): | |
| return ("RAN", 0) | |
| m = re.match(r"([A-Z]+)(\d+)", working_group) | |
| if m: | |
| code, num = m.groups() | |
| return (code, int(num)) | |
| else: | |
| raise ValueError("Unattended format") | |
| def get_meetings(working_group: str): | |
| category, wg_number = get_folder_name(working_group) | |
| folder = meetings_mapping[category][wg_number] | |
| url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}/{folder}" | |
| response = requests.get(url, verify=False) | |
| responseHTML = response.text | |
| soup = BeautifulSoup(responseHTML, "html.parser") | |
| return {"url": url, "meetings": [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG") or item.get_text().startswith("CT")]} | |
| def index_tdocs_wg_progress(req: IndexTDoc): | |
| if not req.wg: | |
| raise HTTPException(status_code=400, detail="Working Group not defined !") | |
| category, wg_number = get_folder_name(req.wg) | |
| folder = meetings_mapping[category][wg_number] | |
| url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}" | |
| def generate_events(): | |
| yield f"event: info\ndata: {req.wg}\n\n" | |
| for content in tdoc_indexer.process_workgroup(folder, url): | |
| yield content | |
| tdoc_indexer.save_indexer() | |
| yield "event: end\ndata: Indexation ended successfully !\n\n" | |
| return StreamingResponse(generate_events(), media_type="text/event-stream") | |
| def index_tdocs_meeting_progress(req: IndexTDoc): | |
| if not req.wg: | |
| raise HTTPException(status_code=400, detail="Working Group not defined !") | |
| if not req.meetings: | |
| raise HTTPException(status_code=400, detail="Meetings not defined !") | |
| category, wg_number = get_folder_name(req.wg) | |
| folder = meetings_mapping[category][wg_number] | |
| url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}/{folder}" | |
| def generate_events(): | |
| yield f"event: get-maximum\ndata: {len(req.meetings)}\n\n" | |
| for i, meet in enumerate(req.meetings): | |
| yield f"event: info\ndata: {req.wg}-{meet}\n\n" | |
| tdoc_indexer.process_meeting(meet, url) | |
| yield f"event: progress\ndata: {i+1}\n\n" | |
| tdoc_indexer.save_indexer() | |
| yield "event: end\ndata: Indexation ended successfully !\n\n" | |
| return StreamingResponse(generate_events(), media_type="text/event-stream") | |
| def index_all_tdocs_progress(): | |
| def generate_events(): | |
| for content in tdoc_indexer.index_all_tdocs(): | |
| yield content | |
| tdoc_indexer.save_indexer() | |
| yield "event: end\ndata: Indexation ended successfully !\n\n" | |
| return StreamingResponse(generate_events(), media_type="text/event-stream") | |
| def index_3gpp_specs_progress(): | |
| def generate_events(): | |
| for content in spec_3gpp_indexer.run(): | |
| yield content | |
| yield "event: info\ndata: Saving index ...\n\n" | |
| yield "event: get-maximum\ndata: 1\n\n" | |
| yield "event: progress\ndata: 1\n\n" | |
| spec_3gpp_indexer.save() | |
| yield "event: info\ndata: Creating BM25 models ...\n\n" | |
| yield "event: get-maximum\ndata: 1\n\n" | |
| yield "event: progress\ndata: 1\n\n" | |
| spec_3gpp_indexer.create_bm25_index() | |
| yield "event: end\ndata: Indexation ended successfully !\n\n" | |
| return StreamingResponse(generate_events(), media_type="text/event-stream") | |
| def index_etsi_specs_progress(): | |
| def generate_events(): | |
| for content in spec_etsi_indexer.run(): | |
| yield content | |
| yield "event: info\ndata: Saving index ...\n\n" | |
| yield "event: get-maximum\ndata: 1\n\n" | |
| yield "event: progress\ndata: 1\n\n" | |
| spec_etsi_indexer.save() | |
| yield "event: info\ndata: Creating BM25 models ...\n\n" | |
| yield "event: get-maximum\ndata: 1\n\n" | |
| yield "event: progress\ndata: 1\n\n" | |
| spec_etsi_indexer.create_bm25_index() | |
| yield "event: end\ndata: Indexation ended successfully !\n\n" | |
| return StreamingResponse(generate_events(), media_type="text/event-stream") |