Spaces:
Build error
Build error
| import spacy | |
| from spacy.language import Language | |
| from spacy.lang.it import Italian | |
| import re | |
| from transformers import pipeline | |
| from gradio.inputs import File | |
| import gradio as gr | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| import tempfile | |
| import os | |
| from gradio.inputs import Dropdown | |
| import gradio as gr | |
| import tempfile | |
| import os | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| import fitz | |
| from pdf2image import convert_from_bytes | |
| def preprocess_punctuation(text): | |
| pattern = r'(?<![a-z])[a-zA-Z\.]{1,4}(?:\.[a-zA-Z\.]{1,4})*\.(?!\s*[A-Z])' | |
| matches = re.findall(pattern, text) | |
| res = [*set(matches)] | |
| #res = [r for r in res if not nlp(r).ents or | |
| #not any(ent.label_ in nlp.get_pipe('ner').labels for ent in nlp(r).ents)] #optimized | |
| return res | |
| def preprocess_text(text): | |
| prep_text = re.sub(r'\n\s*\n', '\n', text) | |
| prep_text = re.sub(r'\n{2,}', '\n', prep_text) | |
| #string_with_single_newlines_and_no_blank_lines = re.sub(r' {2,}', ' ', string_with_single_newlines_and_no_blank_lines) | |
| #print(string_with_single_newlines_and_no_blank_lines) | |
| return prep_text | |
| def custom_tokenizer(doc): | |
| # Define a custom rule to ignore colons as a sentence boundary | |
| for token in doc[:-1]: | |
| if (token.text == ":"): | |
| doc[token.i+1].is_sent_start = False | |
| return doc | |
| def get_sentences(text, dictionary = None): | |
| cl_sentences = [] | |
| chars_to_strip = [' ', '\n'] | |
| chars_to_strip_str = ''.join(set(chars_to_strip)) | |
| nlp = spacy.load("it_core_news_lg") #load ita moodel | |
| nlp.add_pipe("custom_tokenizer", before="parser") | |
| for punct in preprocess_punctuation(text): | |
| nlp.tokenizer.add_special_case(punct, [{spacy.symbols.ORTH: punct, spacy.symbols.NORM: punct}]) | |
| doc = nlp(text) # Process the text with spaCy | |
| sentences = list(doc.sents) # Split the text into sentences | |
| for sentence in sentences: | |
| sent = sentence.text | |
| cl_sentence = ' '.join(filter(None, sent.lstrip(chars_to_strip_str).rstrip(chars_to_strip_str).split(' '))) | |
| if cl_sentence!= '': | |
| cl_sentences.append(cl_sentence) | |
| return cl_sentences | |
| def extract_numbers(text, given_strings): | |
| # Split text into a list of words | |
| words = text.split() | |
| # Find the indices of the given strings in the list of words | |
| indices = [i for i, word in enumerate(words) if any(s in word for s in given_strings)] | |
| # Initialize an empty list to store the numbers | |
| numbers = [] | |
| # Loop through each index | |
| for index in indices: | |
| # Define the range of words to search for numbers | |
| start = max(index - 1, 0) | |
| end = min(index + 2, len(words)) | |
| # Extract the words within the range | |
| context = words[start:end] | |
| # Check if the context contains mathematical operators | |
| if any(re.match(r'[+\*/]', word) for word in context): | |
| continue | |
| # Find all numbers in the context | |
| context_numbers = [ | |
| float(re.sub('[^0-9\.,]+', '', word).replace(',', '.')) | |
| if re.sub('[^0-9\.,]+', '', word).replace(',', '.').replace('.', '', 1).isdigit() | |
| else int(re.sub('[^0-9]+', '', word)) | |
| if re.sub('[^0-9]+', '', word).isdigit() | |
| else None | |
| for word in context | |
| ] | |
| # Add the numbers to the list | |
| numbers.extend(context_numbers) | |
| return numbers | |
| def get_text_and_values(text, key_list): | |
| sentences = get_sentences(text) | |
| total_numbers= [] | |
| infoDict = {} | |
| for sentence in sentences: | |
| numbers = extract_numbers(text = sentence, given_strings = key_list) | |
| total_numbers.append(numbers) | |
| if not numbers: | |
| continue | |
| else: infoDict[sentence] = numbers | |
| return infoDict | |
| def get_useful_text(dictionary): | |
| keysList = list(dictionary.keys()) | |
| tx = ('\n------------------------\n'.join(keysList)) | |
| return tx | |
| def get_values(dictionary): | |
| pr = list(dictionary.values()) | |
| return pr | |
| def initialize_qa_transformer(model): | |
| qa = pipeline("text2text-generation", model=model) | |
| return qa | |
| def get_answers_unfiltered(dictionary, question, qa_pipeline): | |
| keysList = list(dictionary.keys()) | |
| answers = [] | |
| for kl in keysList: | |
| answer = qa_pipeline(f'{kl} Domanda: {question}') | |
| answers.append(answer) | |
| return answers | |
| def get_total(answered_values, text, keywords, raw_values, unique_values = False): | |
| numeric_list = [num for sublist in raw_values for num in sublist if isinstance(num, (int, float))] | |
| #numbers = [float(x[0]['generated_text']) for x in answered_values if x[0]['generated_text'].isdigit()] | |
| pattern = r'\d+(?:[.,]\d+)?' | |
| numbers = [] | |
| for sub_lst in answered_values: | |
| for d in sub_lst: | |
| for k, v in d.items(): | |
| # Replace commas with dots | |
| v = v.replace(',', '.') | |
| # Extract numbers and convert to float | |
| numbers += [float(match) for match in re.findall(pattern, v) if (float(match) >= 5.0) and (float(match) in numeric_list)] | |
| ###### remove duplicates | |
| if unique_values: | |
| numbers = list(set(numbers)) | |
| ###### | |
| total = 0 | |
| sum = 0 | |
| total_list = [] | |
| # Define a regular expression pattern that will match a number | |
| pattern = r'\d+' | |
| # Loop through the keywords and search for them in the text | |
| found = False | |
| for keyword in keywords: | |
| # Build a regular expression pattern that looks for the keyword | |
| # followed by up to three words, then a number | |
| keyword_pattern = f'{keyword}(\\s+\\w+){{0,3}}\\s+({pattern})' | |
| match = re.search(keyword_pattern, text, re.IGNORECASE) | |
| if match: | |
| # If we find a match, print the number and set found to True | |
| number = match.group(2) | |
| if (number in numbers) and (number in numeric_list): | |
| total_list.append(int(number)) | |
| print(f"Found a value ({number}) for keyword '{keyword}'.") | |
| found = True | |
| # If we didn't find a match | |
| if not found: | |
| for value in numbers: | |
| if value in numeric_list: | |
| total += value | |
| total_list.append(total) | |
| #If there is more than one total, it means different lots with many total measures for each house. Calculate the sum of the totals mq | |
| for value in total_list: | |
| sum += value | |
| return numbers, sum | |
| def extractor_clean(text, k_words, transformer, question, total_kwords, return_text = False): | |
| tex = '' | |
| dictionary = get_text_and_values(text, k_words) | |
| raw = get_values(dictionary) | |
| qa = initialize_qa_transformer(transformer) | |
| val = get_answers_unfiltered(dictionary, question = question, qa_pipeline = qa) | |
| keywords = ['totale', 'complessivo', 'complessiva'] | |
| values = get_total(answered_values= val, raw_values = raw, text = text, keywords = total_kwords, unique_values = True) | |
| if return_text: | |
| tex = get_useful_text(dictionary) | |
| return values, return_text, tex | |
| elif return_text == False: | |
| return values, return_text | |
| def pdf_ocr(file, model_t, question): | |
| # Convert PDF to image | |
| with tempfile.TemporaryDirectory() as path: | |
| with open(file, "rb") as f: | |
| content = f.read() | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| num_pages = len(doc) | |
| # Extract text from the PDF | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| # Perform OCR on the PDF if the extracted text is empty | |
| if not text: | |
| # Convert PDF pages to images | |
| images = convert_from_bytes(content) | |
| for i, img in enumerate(images): | |
| text += pytesseract.image_to_string(img, lang='ita') | |
| # Clear the image list to free up memory | |
| del images | |
| ks = ('mq', 'MQ', 'Mq', 'metri quadri', 'm2') | |
| quest = "Quanti metri quadri misura la superficie?" | |
| totalK = ['totale', 'complessivo', 'complessiva'] | |
| extracted_values = extractor_clean(text=text, k_words=ks, transformer=model_t, question=question, total_kwords=totalK, return_text=True) | |
| values_output = extracted_values[0][0] | |
| sor_values = sorted(values_output) | |
| total_output = f'{extracted_values[0][1]} Mq' | |
| text_output = extracted_values[2] | |
| immobile_values = [f'{i + 1}. Immobile : {value} Mq\n' for i, value in enumerate(sor_values)] | |
| immobile_values = '\n'.join(immobile_values) | |
| return immobile_values, total_output, text_output | |
| def ocr_interface(pdf_file, model_t='it5/it5-base-question-answering', question="Quanti metri quadri misura l'immobile?"): | |
| # Call the pdf_ocr function | |
| values, total, text = pdf_ocr(pdf_file.name, model_t, question) | |
| return values, total, text | |
| # Start the UI | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| ''' | |
| # PDF Mq Extractor | |
| Demo for ITAL-IA | |
| ''') | |
| with gr.Tab("Extractor"): | |
| with gr.Row(): | |
| pdf_input = gr.components.File(label="PDF File") | |
| with gr.Row(): | |
| model_input = gr.components.Dropdown(['it5/it5-base-question-answering', 'it5/it5-small-question-answering'], | |
| value='it5/it5-base-question-answering', label = 'Select model') | |
| question_input = gr.components.Dropdown(["Quanti metri quadri misura l'immobile?"], | |
| value = "Quanti metri quadri misura l'immobile?", label = 'Question') | |
| with gr.Column(): | |
| gr.Markdown( | |
| ''' | |
| # Output values | |
| Values extracted from the pdf document | |
| ''') | |
| with gr.Row(): | |
| text_output = gr.components.Textbox(label="Ref. Text") | |
| values_output = gr.components.Textbox(label="Area Values - sorted by value") | |
| total_output = gr.components.Textbox(label="Total") | |
| with gr.Row(): | |
| extract_button = gr.Button("Extract") | |
| extract_button.click(fn = ocr_interface, | |
| inputs=[pdf_input, model_input, question_input], outputs=[values_output, total_output, text_output]) | |
| gr.Examples(['Example1(scannedDoc).pdf', 'Example2.pdf', 'Example3Large.pdf'], inputs = pdf_input, | |
| cache_examples = True, fn = ocr_interface, outputs = [values_output, total_output, text_output]) | |
| demo.launch() |