Spaces:
Runtime error
Runtime error
| import json | |
| import logging | |
| import os | |
| import shutil | |
| import sys | |
| import uuid | |
| from json import JSONDecodeError | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import pandas as pd | |
| import pinecone | |
| import streamlit as st | |
| from annotated_text import annotation | |
| from haystack import BaseComponent, Document | |
| from haystack.document_stores import PineconeDocumentStore | |
| from haystack.nodes import ( | |
| DocxToTextConverter, | |
| EmbeddingRetriever, | |
| FARMReader, | |
| FileTypeClassifier, | |
| PDFToTextConverter, | |
| PreProcessor, | |
| TextConverter, | |
| ) | |
| from haystack.pipelines import ExtractiveQAPipeline, Pipeline | |
| from markdown import markdown | |
| from sentence_transformers import SentenceTransformer | |
| class PineconeSearch(BaseComponent): | |
| outgoing_edges = 1 | |
| def run(self, query: str, top_k: Optional[int]): | |
| # process the inputs | |
| vector_embedding = emb_model.encode(query).tolist() | |
| response = index.query([vector_embedding], top_k=top_k, include_metadata=True) | |
| docs = [ | |
| Document( | |
| content=d["metadata"]["text"], | |
| meta={ | |
| "title": d["metadata"]["filename"], | |
| "context": d["metadata"]["text"], | |
| "_split_id": d["metadata"]["_split_id"], | |
| }, | |
| ) | |
| for d in response["matches"] | |
| ] | |
| output = {"documents": docs, "query": query} | |
| return output, "output_1" | |
| def run_batch(self, queries: List[str], top_k: Optional[int]): | |
| return {}, "output_1" | |
| # connect to pinecone environment | |
| pinecone.init(api_key=st.secrets["pinecone_apikey"], environment="us-west1-gcp") | |
| index_name = "qa-demo-fast-384" | |
| # retriever_model = "sentence-transformers/multi-qa-mpnet-base-dot-v1" | |
| retriever_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1" | |
| emb_model = SentenceTransformer(retriever_model) | |
| embedding_dim = 384 | |
| preprocessor = PreProcessor( | |
| clean_empty_lines=True, | |
| clean_whitespace=True, | |
| clean_header_footer=False, | |
| split_by="word", | |
| split_length=100, | |
| split_respect_sentence_boundary=True, | |
| ) | |
| file_type_classifier = FileTypeClassifier() | |
| text_converter = TextConverter() | |
| pdf_converter = PDFToTextConverter() | |
| docx_converter = DocxToTextConverter() | |
| # check if the abstractive-question-answering index exists | |
| if index_name not in pinecone.list_indexes(): | |
| # delete the current index and create the new index if it does not exist | |
| for delete_index in pinecone.list_indexes(): | |
| pinecone.delete_index(delete_index) | |
| pinecone.create_index(index_name, dimension=embedding_dim, metric="cosine") | |
| # connect to abstractive-question-answering index we created | |
| index = pinecone.Index(index_name) | |
| FILE_UPLOAD_PATH = "./data/uploads/" | |
| os.makedirs(FILE_UPLOAD_PATH, exist_ok=True) | |
| def create_doc_store(): | |
| document_store = PineconeDocumentStore( | |
| api_key=st.secrets["pinecone_apikey"], | |
| index=index_name, | |
| similarity="cosine", | |
| embedding_dim=embedding_dim, | |
| ) | |
| return document_store | |
| def query(pipe, question, top_k_reader, top_k_retriever): | |
| res = pipe.run( | |
| query=question, | |
| params={"Retriever": {"top_k": top_k_retriever}, "Reader": {"top_k": top_k_reader}}, | |
| ) | |
| return res | |
| document_store = create_doc_store() | |
| # pipe = create_pipe(document_store) | |
| retriever = EmbeddingRetriever( | |
| document_store=document_store, | |
| embedding_model=retriever_model, | |
| model_format="sentence_transformers", | |
| ) | |
| # load the retriever model from huggingface model hub | |
| sentence_encoder = SentenceTransformer(retriever_model) | |
| reader = FARMReader(model_name_or_path="deepset/roberta-base-squad2", use_gpu=False) | |
| # pipe = ExtractiveQAPipeline(reader, retriever) | |
| # Custom built extractive QA pipeline | |
| pipe = Pipeline() | |
| pipe.add_node(component=PineconeSearch(), name="Retriever", inputs=["Query"]) | |
| pipe.add_node(component=reader, name="Reader", inputs=["Retriever"]) | |
| indexing_pipeline_with_classification = Pipeline() | |
| indexing_pipeline_with_classification.add_node( | |
| component=file_type_classifier, name="FileTypeClassifier", inputs=["File"] | |
| ) | |
| indexing_pipeline_with_classification.add_node( | |
| component=text_converter, name="TextConverter", inputs=["FileTypeClassifier.output_1"] | |
| ) | |
| indexing_pipeline_with_classification.add_node( | |
| component=pdf_converter, name="PdfConverter", inputs=["FileTypeClassifier.output_2"] | |
| ) | |
| indexing_pipeline_with_classification.add_node( | |
| component=docx_converter, name="DocxConverter", inputs=["FileTypeClassifier.output_4"] | |
| ) | |
| indexing_pipeline_with_classification.add_node( | |
| component=preprocessor, | |
| name="Preprocessor", | |
| inputs=["TextConverter", "PdfConverter", "DocxConverter"], | |
| ) | |
| def set_state_if_absent(key, value): | |
| if key not in st.session_state: | |
| st.session_state[key] = value | |
| # Adjust to a question that you would like users to see in the search bar when they load the UI: | |
| DEFAULT_QUESTION_AT_STARTUP = os.getenv( | |
| "DEFAULT_QUESTION_AT_STARTUP", "My blog post discusses remote work. Give me statistics." | |
| ) | |
| DEFAULT_ANSWER_AT_STARTUP = os.getenv( | |
| "DEFAULT_ANSWER_AT_STARTUP", | |
| "7% more remote workers have been at their current organization for 5 years or fewer", | |
| ) | |
| # Sliders | |
| DEFAULT_DOCS_FROM_RETRIEVER = int(os.getenv("DEFAULT_DOCS_FROM_RETRIEVER", "3")) | |
| DEFAULT_NUMBER_OF_ANSWERS = int(os.getenv("DEFAULT_NUMBER_OF_ANSWERS", "3")) | |
| st.set_page_config( | |
| page_title="Haystack Demo", page_icon="https://haystack.deepset.ai/img/HaystackIcon.png" | |
| ) | |
| # Persistent state | |
| set_state_if_absent("question", DEFAULT_QUESTION_AT_STARTUP) | |
| set_state_if_absent("answer", DEFAULT_ANSWER_AT_STARTUP) | |
| set_state_if_absent("results", None) | |
| # Small callback to reset the interface in case the text of the question changes | |
| def reset_results(*args): | |
| st.session_state.answer = None | |
| st.session_state.results = None | |
| st.session_state.raw_json = None | |
| # Title | |
| st.write("# Haystack Search Demo") | |
| st.markdown( | |
| """ | |
| This demo takes its data from two sample data csv with statistics on various topics. \n | |
| Ask any question on this topic and see if Haystack can find the correct answer to your query! \n | |
| *Note: do not use keywords, but full-fledged questions.* The demo is not optimized to deal with keyword queries and might misunderstand you. | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # Sidebar | |
| st.sidebar.header("Options") | |
| st.sidebar.write("## File Upload:") | |
| data_files = st.sidebar.file_uploader( | |
| "upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" | |
| ) | |
| ALL_FILES = [] | |
| META_DATA = [] | |
| for data_file in data_files: | |
| # Upload file | |
| if data_file: | |
| file_path = Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{data_file.name}" | |
| with open(file_path, "wb") as f: | |
| f.write(data_file.getbuffer()) | |
| ALL_FILES.append(file_path) | |
| st.sidebar.write(str(data_file.name) + " β ") | |
| META_DATA.append({"filename": data_file.name}) | |
| data_files = [] | |
| if len(ALL_FILES) > 0: | |
| # document_store.update_embeddings(retriever, update_existing_embeddings=False) | |
| docs = indexing_pipeline_with_classification.run(file_paths=ALL_FILES, meta=META_DATA)[ | |
| "documents" | |
| ] | |
| index_name = "qa_demo" | |
| # we will use batches of 64 | |
| batch_size = 128 | |
| # docs = docs['documents'] | |
| # with st.spinner( | |
| # "π§ Performing indexing of uplaoded documents... \n " | |
| # ): | |
| my_bar = st.progress(0) | |
| upload_count = 0 | |
| for i in range(0, len(docs), batch_size): | |
| # find end of batch | |
| i_end = min(i + batch_size, len(docs)) | |
| # extract batch | |
| batch = [doc.content for doc in docs[i:i_end]] | |
| # generate embeddings for batch | |
| emb = sentence_encoder.encode(batch).tolist() | |
| # get metadata | |
| # meta = [doc.meta for doc in docs[i:i_end]] | |
| meta = [] | |
| for doc in docs[i:i_end]: | |
| meta_dict = doc.meta | |
| meta_dict["text"] = doc.content | |
| meta.append(meta_dict) | |
| # create unique IDs | |
| ids = [doc.id for doc in docs[i:i_end]] | |
| # add all to upsert list | |
| to_upsert = list(zip(ids, emb, meta)) | |
| # upsert/insert these records to pinecone | |
| _ = index.upsert(vectors=to_upsert) | |
| upload_count += batch_size | |
| upload_percentage = min(int((upload_count / len(docs)) * 100), 100) | |
| my_bar.progress(upload_percentage) | |
| top_k_reader = st.sidebar.slider( | |
| "Max. number of answers", | |
| min_value=1, | |
| max_value=10, | |
| value=DEFAULT_NUMBER_OF_ANSWERS, | |
| step=1, | |
| on_change=reset_results, | |
| ) | |
| top_k_retriever = st.sidebar.slider( | |
| "Max. number of documents from retriever", | |
| min_value=1, | |
| max_value=10, | |
| value=DEFAULT_DOCS_FROM_RETRIEVER, | |
| step=1, | |
| on_change=reset_results, | |
| ) | |
| # data_files = st.file_uploader( | |
| # "upload", type=["csv"], accept_multiple_files=True, label_visibility="hidden" | |
| # ) | |
| # for data_file in data_files: | |
| # # Upload file | |
| # if data_file: | |
| # raw_json = upload_doc(data_file) | |
| question = st.text_input( | |
| value=st.session_state.question, | |
| max_chars=100, | |
| on_change=reset_results, | |
| label="question", | |
| label_visibility="hidden", | |
| ) | |
| col1, col2 = st.columns(2) | |
| col1.markdown("<style>.stButton button {width:100%;}</style>", unsafe_allow_html=True) | |
| col2.markdown("<style>.stButton button {width:100%;}</style>", unsafe_allow_html=True) | |
| # Run button | |
| run_pressed = col1.button("Run") | |
| if run_pressed: | |
| run_query = run_pressed or question != st.session_state.question | |
| # Get results for query | |
| if run_query and question: | |
| reset_results() | |
| st.session_state.question = question | |
| with st.spinner("π§ Performing neural search on documents... \n "): | |
| try: | |
| st.session_state.results = query( | |
| pipe, question, top_k_reader=top_k_reader, top_k_retriever=top_k_retriever | |
| ) | |
| except JSONDecodeError as je: | |
| st.error( | |
| "π An error occurred reading the results. Is the document store working?" | |
| ) | |
| except Exception as e: | |
| logging.exception(e) | |
| if "The server is busy processing requests" in str(e) or "503" in str(e): | |
| st.error("π§βπΎ All our workers are busy! Try again later.") | |
| else: | |
| st.error(f"π An error occurred during the request. {str(e)}") | |
| if st.session_state.results: | |
| st.write("## Results:") | |
| for count, result in enumerate(st.session_state.results["answers"]): | |
| answer, context = result.answer, result.context | |
| start_idx = context.find(answer) | |
| end_idx = start_idx + len(answer) | |
| # Hack due to this bug: https://github.com/streamlit/streamlit/issues/3190 | |
| try: | |
| filename = result.meta["title"] | |
| st.write( | |
| markdown( | |
| f'From file: {filename} \n {context[:start_idx] } {str(annotation(answer, "ANSWER", "#8ef"))} {context[end_idx:]} \n ' | |
| ), | |
| unsafe_allow_html=True, | |
| ) | |
| except: | |
| filename = result.meta.get("filename", "") | |
| st.write( | |
| markdown( | |
| f'From file: {filename} \n {context[:start_idx] } {str(annotation(answer, "ANSWER", "#8ef"))} {context[end_idx:]} \n ' | |
| ), | |
| unsafe_allow_html=True, | |
| ) | |