Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import torch | |
| import uvicorn | |
| import spacy | |
| import pdfplumber | |
| import ffmpeg # β Replaced moviepy with ffmpeg-python | |
| import librosa | |
| import soundfile as sf | |
| import subprocess | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from sentence_transformers import SentenceTransformer, util | |
| # β Suppress Warnings | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '0' | |
| # β Ensure GPU is Used | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # β Fix Spacy Installation (Prevent Permission Errors) | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm", "--user"]) | |
| nlp = spacy.load("en_core_web_sm") | |
| # β Load NLP Models | |
| try: | |
| summarizer = pipeline("summarization", model="nsi319/legal-pegasus", device=0 if torch.cuda.is_available() else -1) | |
| embedding_model = SentenceTransformer("all-mpnet-base-v2", device=device) | |
| ner_model = pipeline("ner", model="dslim/bert-base-NER", tokenizer="dslim/bert-base-NER", device=0 if torch.cuda.is_available() else -1) | |
| speech_to_text = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=0 if torch.cuda.is_available() else -1) | |
| except Exception as e: | |
| raise RuntimeError(f"Error loading models: {str(e)}") | |
| # β Load Falcon 7B for Chatbot | |
| MODEL_NAME = "tiiuae/falcon-7b-instruct" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| chatbot_model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto" | |
| ) | |
| # β Initialize FastAPI | |
| app = FastAPI() | |
| # β PDF Text Extraction | |
| def extract_text_from_pdf(pdf_file): | |
| """Extracts text from a PDF file using pdfplumber.""" | |
| try: | |
| with pdfplumber.open(pdf_file) as pdf: | |
| text = "\n".join([page.extract_text() or "" for page in pdf.pages]) | |
| if not text.strip(): | |
| raise ValueError("No readable text found in PDF. It may be a scanned document.") | |
| return text | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"PDF extraction failed: {str(e)}") | |
| # β Video-to-Audio Extraction (Using FFmpeg Instead of MoviePy) | |
| def extract_audio_from_video(video_path): | |
| """Extracts audio from a video file using FFmpeg.""" | |
| try: | |
| audio_path = video_path.replace(".mp4", ".wav") | |
| ffmpeg.input(video_path).output(audio_path, format="wav").run(overwrite_output=True) | |
| return audio_path | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Audio extraction failed: {str(e)}") | |
| # β Speech-to-Text Transcription (Fix for Long Audio) | |
| def transcribe_audio(audio_path): | |
| """Transcribes speech to text using Whisper model with chunking for long files.""" | |
| try: | |
| audio, sr = librosa.load(audio_path, sr=16000) | |
| duration = len(audio) / sr | |
| if duration > 30: | |
| chunk_size = 30 * sr # 30-second chunks | |
| chunks = [audio[i:i + chunk_size] for i in range(0, len(audio), chunk_size)] | |
| transcripts = [] | |
| for idx, chunk in enumerate(chunks): | |
| temp_chunk_path = f"temp_chunk_{idx}.wav" | |
| sf.write(temp_chunk_path, chunk, sr) | |
| result = speech_to_text(temp_chunk_path) | |
| transcripts.append(result["text"]) | |
| os.remove(temp_chunk_path) | |
| return " ".join(transcripts) | |
| else: | |
| result = speech_to_text(audio_path) | |
| return result["text"] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Speech-to-text failed: {str(e)}") | |
| # β Legal Document Summarization | |
| async def summarize_legal_document(text): | |
| """Generates a summary of the legal document.""" | |
| try: | |
| summary = summarizer(text[:1024], max_length=200, min_length=50, do_sample=False) | |
| return summary[0]['summary_text'] | |
| except Exception as e: | |
| return "Summarization failed due to an internal error." | |
| # β Legal Document Analysis API | |
| async def analyze_legal_document(file: UploadFile = File(...)): | |
| """Analyzes a legal document by extracting text, summarizing, and identifying entities.""" | |
| try: | |
| content = await file.read() | |
| text = extract_text_from_pdf(io.BytesIO(content)) | |
| summary = await summarize_legal_document(text) | |
| return {"status": "success", "summary": summary} | |
| except Exception as e: | |
| return {"status": "error", "detail": str(e)} | |
| # β Chatbot API | |
| async def chatbot_endpoint(query: dict): | |
| """Handles chatbot queries using Falcon 7B.""" | |
| try: | |
| input_text = query.get("query", "") | |
| if not input_text: | |
| raise HTTPException(status_code=400, detail="Query cannot be empty.") | |
| inputs = tokenizer(input_text, return_tensors="pt").to(device) | |
| outputs = chatbot_model.generate(**inputs, max_length=200) | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return {"status": "success", "answer": response} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # β Video Upload & Analysis API | |
| async def analyze_video(file: UploadFile = File(...)): | |
| """Extracts speech from video and analyzes it.""" | |
| try: | |
| video_path = f"temp_{file.filename}" | |
| with open(video_path, "wb") as f: | |
| f.write(await file.read()) | |
| audio_path = extract_audio_from_video(video_path) | |
| transcript = transcribe_audio(audio_path) | |
| return {"status": "success", "transcript": transcript} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # β Run FastAPI Server | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |