Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form, Request, HTTPException | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.templating import Jinja2Templates | |
| from typing import List, Optional | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_community.document_loaders import PyPDFLoader, UnstructuredCSVLoader, UnstructuredExcelLoader, Docx2txtLoader, UnstructuredPowerPointLoader | |
| from langchain.chains import StuffDocumentsChain | |
| from langchain.chains.llm import LLMChain | |
| from langchain.prompts import PromptTemplate | |
| from langchain.vectorstores import FAISS | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| import json | |
| import os | |
| import google.generativeai as genai | |
| import re | |
| import nest_asyncio | |
| import nltk | |
| from langchain.text_splitter import CharacterTextSplitter | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| if os.getenv("FASTAPI_ENV") == "development": | |
| nest_asyncio.apply() | |
| nltk.download('averaged_perceptron_tagger_eng') | |
| from nltk.tokenize import word_tokenize | |
| # Initialize your model and other variables | |
| uploaded_file_path = None | |
| document_analyzed = False | |
| summary = None | |
| question_responses = [] | |
| api = None | |
| llm = None | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_DANGEROUS", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| ] | |
| def format_text(text: str) -> str: | |
| text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) | |
| text = text.replace('*', '<br>') | |
| return text | |
| # Route for main page | |
| async def read_main(request: Request): | |
| return templates.TemplateResponse("analyze.html", { | |
| "request": request, | |
| "summary": summary, | |
| "show_conversation": document_analyzed, | |
| "question_responses": question_responses | |
| }) | |
| # Route for analyzing documents | |
| async def analyze_document( | |
| request: Request, | |
| api_key: str = Form(...), | |
| iam: str = Form(...), | |
| context: str = Form(...), | |
| output: str = Form(...), | |
| summary_length: str = Form(...), | |
| file: UploadFile = File(...) | |
| ): | |
| global uploaded_file_path, document_analyzed, summary, question_responses, api, llm | |
| loader = None | |
| try: | |
| # Initialize or update API key and models | |
| api = api_key | |
| genai.configure(api_key=api) | |
| llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", google_api_key=api) | |
| # Save the uploaded file | |
| uploaded_file_path = "uploaded_file" + os.path.splitext(file.filename)[1] | |
| with open(uploaded_file_path, "wb") as f: | |
| f.write(file.file.read()) | |
| # Determine the file type and load accordingly | |
| file_extension = os.path.splitext(uploaded_file_path)[1].lower() | |
| print(f"File extension: {file_extension}") # Debugging statement | |
| if file_extension == ".pdf": | |
| loader = PyPDFLoader(uploaded_file_path) | |
| elif file_extension == ".csv": | |
| loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements", encoding="utf8") | |
| elif file_extension == ".xlsx": | |
| loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements") | |
| elif file_extension == ".docx": | |
| loader = Docx2txtLoader(uploaded_file_path) | |
| elif file_extension == ".pptx": | |
| loader = UnstructuredPowerPointLoader(uploaded_file_path) | |
| elif file_extension == ".mp3": | |
| # Process audio files differently | |
| audio_file = genai.upload_file(path=uploaded_file_path) | |
| model = genai.GenerativeModel(model_name="gemini-1.5-flash") | |
| prompt = f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary." | |
| response = model.generate_content([prompt, audio_file], safety_settings=safety_settings) | |
| summary = format_text(response.text) | |
| document_analyzed = True | |
| outputs = {"summary": summary} | |
| with open("output_summary.json", "w") as outfile: | |
| json.dump(outputs, outfile) | |
| return templates.TemplateResponse("analyze.html", { | |
| "request": request, | |
| "summary": summary, | |
| "show_conversation": document_analyzed, | |
| "question_responses": question_responses | |
| }) | |
| # If no loader is set, raise an exception | |
| if loader is None: | |
| raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") | |
| docs = loader.load() | |
| prompt_template = PromptTemplate.from_template( | |
| f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary of the following text: {{text}}" | |
| ) | |
| llm_chain = LLMChain(llm=llm, prompt=prompt_template) | |
| stuff_chain = StuffDocumentsChain(llm_chain=llm_chain, document_variable_name="text") | |
| response = stuff_chain.invoke(docs) | |
| summary = format_text(response["output_text"]) | |
| document_analyzed = True | |
| outputs = {"summary": summary} | |
| with open("output.json", "w") as outfile: | |
| json.dump(outputs, outfile) | |
| return templates.TemplateResponse("analyze.html", { | |
| "request": request, | |
| "summary": summary, | |
| "show_conversation": document_analyzed, | |
| "question_responses": question_responses | |
| }) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {e}") | |
| # Route for asking questions | |
| from langchain.text_splitter import CharacterTextSplitter # Ensure this is imported | |
| async def ask_question(request: Request, question: str = Form(...)): | |
| global uploaded_file_path, question_responses, llm, api | |
| loader = None | |
| if uploaded_file_path: | |
| # Determine the file type and load accordingly | |
| file_extension = os.path.splitext(uploaded_file_path)[1].lower() | |
| if file_extension == ".pdf": | |
| loader = PyPDFLoader(uploaded_file_path) | |
| elif file_extension == ".csv": | |
| loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements") | |
| elif file_extension == ".xlsx": | |
| loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements") | |
| elif file_extension == ".docx": | |
| loader = Docx2txtLoader(uploaded_file_path) | |
| elif file_extension == ".pptx": | |
| loader = UnstructuredPowerPointLoader(uploaded_file_path) | |
| elif file_extension == ".mp3": | |
| audio_file = genai.upload_file(path=uploaded_file_path) | |
| model = genai.GenerativeModel(model_name="gemini-1.5-flash") | |
| latest_conversation = request.cookies.get("latest_question_response", "") | |
| prompt = "Answer the question based on the speech: " + question + (f" Latest conversation: {latest_conversation}" if latest_conversation else "") | |
| response = model.generate_content([prompt, audio_file], safety_settings=safety_settings) | |
| current_response = response.text | |
| current_question = f"You asked: {question}" | |
| # Save the latest question and response to the session | |
| question_responses.append((current_question, current_response)) | |
| # Perform vector embedding and search | |
| text = current_response # Use the summary generated from the MP3 content | |
| os.environ["GOOGLE_API_KEY"] = api | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| summary_embedding = embeddings.embed_query(text) | |
| document_search = FAISS.from_texts([text], embeddings) | |
| if document_search: | |
| query_embedding = embeddings.embed_query(question) | |
| results = document_search.similarity_search_by_vector(query_embedding, k=1) | |
| if results: | |
| current_response = results[0].page_content | |
| else: | |
| current_response = "No matching document found in the database." | |
| else: | |
| current_response = "Vector database not initialized." | |
| # Append the question and response from FAISS search | |
| question_responses.append((current_question, current_response)) | |
| # Save all results including FAISS response to output.json | |
| save_to_json(summary, question_responses) | |
| # Save the latest question and response to the session | |
| response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses}) | |
| response.set_cookie(key="latest_question_response", value=current_response) | |
| return response | |
| # If no loader is set, raise an exception | |
| if loader is None: | |
| raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") | |
| docs = loader.load() | |
| text = "\n".join([doc.page_content for doc in docs]) | |
| os.environ["GOOGLE_API_KEY"] = api | |
| # Split the text into chunks | |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| chunks = text_splitter.split_text(text) | |
| # Define the Summarize Chain for the question | |
| latest_conversation = request.cookies.get("latest_question_response", "") | |
| template1 = question + """ answer the question based on the following: | |
| "{text}" | |
| :""" + (f" Answer the Question with no more than 3 sentences. Latest conversation: {latest_conversation}" if latest_conversation else "") | |
| current_response = "" | |
| for chunk in chunks: | |
| prompt1 = PromptTemplate.from_template(template1.format(text=chunk)) | |
| # Initialize the LLMChain with the prompt | |
| llm_chain1 = LLMChain(llm=llm, prompt=prompt1) | |
| response1 = llm_chain1.invoke({"text": chunk}) | |
| current_response += response1["text"] + "\n" | |
| # Generate embeddings for the combined responses | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| summary_embedding = embeddings.embed_query(current_response) | |
| document_search = FAISS.from_texts([current_response], embeddings) | |
| # Perform a search on the FAISS vector database if it's initialized | |
| if document_search: | |
| query_embedding = embeddings.embed_query(question) | |
| results = document_search.similarity_search_by_vector(query_embedding, k=1) | |
| if results: | |
| current_response = format_text(results[0].page_content) | |
| else: | |
| current_response = "No matching document found in the database." | |
| else: | |
| current_response = "Vector database not initialized." | |
| # Append the question and response from FAISS search | |
| current_question = f"You asked: {question}" | |
| question_responses.append((current_question, current_response)) | |
| # Save all results to output.json | |
| save_to_json(summary, question_responses) | |
| # Save the latest question and response to the session | |
| response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses}) | |
| response.set_cookie(key="latest_question_response", value=current_response) | |
| return response | |
| else: | |
| raise HTTPException(status_code=400, detail="No file has been uploaded yet.") | |
| def save_to_json(summary, question_responses): | |
| outputs = { | |
| "summary": summary, | |
| "question_responses": question_responses | |
| } | |
| with open("output_summary.json", "w") as outfile: | |
| json.dump(outputs, outfile) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="127.0.0.1", port=8000) | |