Spaces:
Sleeping
Sleeping
| import json | |
| import numpy as np | |
| import re | |
| from itertools import combinations as itertools_combinations | |
| import os | |
| import sys | |
| from SPARQLWrapper import SPARQLWrapper, JSON | |
| from sentence_transformers import SentenceTransformer | |
| import aiohttp | |
| import asyncio | |
| import streamlit as st | |
| import time | |
| from openai import OpenAI | |
| import sys | |
| import time | |
| from bs4 import BeautifulSoup | |
| import requests | |
| import nest_asyncio | |
| import httpx | |
| nest_asyncio.apply() | |
| folder_path = '/home/user/app/qids_folder' | |
| if not os.path.exists(folder_path): | |
| os.mkdir(folder_path) | |
| else: | |
| pass | |
| folder_path_1 = '/home/user/app/info_extraction' | |
| if not os.path.exists(folder_path_1): | |
| os.mkdir(folder_path_1) | |
| print(f"Folder created at {folder_path_1}") | |
| else: | |
| pass | |
| model = SentenceTransformer("Lajavaness/bilingual-embedding-large", trust_remote_code=True) | |
| async def fetch_json(url, session): | |
| async with session.get(url) as response: | |
| return await response.json() | |
| async def combination_method(name, session): | |
| async with aiohttp.ClientSession() as session: | |
| data = set() | |
| new_name = name.split() | |
| x = itertools_combinations(new_name, 2) | |
| for i in x: | |
| new_word = (i[0] + " " + i[1]) | |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={new_word}&srlimit=20&srprop=&srenablerewrites=True&format=json" | |
| json_data = await fetch_json(url, session) | |
| suggestion = json_data.get('query', {}).get('search', {}) | |
| for pageid in suggestion: | |
| data.add(pageid.get('title', {})) | |
| return data | |
| async def single_method(name, session): | |
| async with aiohttp.ClientSession() as session: | |
| data = set() | |
| new_name = name.replace("-", " ").replace("/", " ").split() | |
| for i in new_name: | |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={i}&srlimit=20&srprop=&srenablerewrites=True&format=json" | |
| json_data = await fetch_json(url, session) | |
| suggestion = json_data.get('query', {}).get('search', {}) | |
| for pageid in suggestion: | |
| data.add(pageid.get('title', {})) | |
| return data | |
| async def mains(name, deep_search): | |
| data = set() | |
| disam_data = set() | |
| qids = set() | |
| async with aiohttp.ClientSession() as session: | |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=20&srprop=&srenablerewrites=True&format=json" | |
| json_data = await fetch_json(url, session) | |
| suggestion = json_data.get('query', {}).get('search', {}) | |
| for pageid in suggestion: | |
| data.add(pageid.get('title', {})) | |
| wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=1&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" | |
| json_data = await fetch_json(wikipedia_url, session) | |
| suggestion = json_data.get('query', {}).get('searchinfo', {}).get('suggestion') | |
| if suggestion: | |
| suggested_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={suggestion}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" | |
| json_suggestion = await fetch_json(suggested_url, session) | |
| results = json_suggestion.get('query', {}).get('search') | |
| for i in results: | |
| data.add(i.get('title')) | |
| # Handle disambiguation links | |
| if data != {0}: | |
| for ids in data: | |
| titles = set() | |
| wikipedia_disambiguation = f"https://en.wikipedia.org/w/api.php?action=query&generator=links&format=json&redirects=1&titles={ids}&prop=pageprops&gpllimit=500&ppprop=wikibase_item" | |
| json_id = await fetch_json(wikipedia_disambiguation, session) | |
| try: | |
| title = json_id.get('query').get('pages') | |
| for k, v in title.items(): | |
| titles.add(v.get("title")) | |
| except: | |
| pass | |
| if "Help:Disambiguation" in titles: | |
| for i in titles: | |
| if ":" not in i and "disambiguation" not in i: | |
| disam_data.add(i) | |
| else: | |
| disam_data.add(ids) | |
| # Makes combinations of the name | |
| if deep_search == "Yes": | |
| if len(name.replace("-", " ").split()) >= 3: | |
| combination_names = await combination_method(name, session) | |
| for i in combination_names: | |
| disam_data.add(i) | |
| # Checks every word alone | |
| if deep_search == "Yes": | |
| if len(name.replace("-", " ").replace("/", " ").split()) >= 2: | |
| singles = await single_method(name, session) | |
| for i in singles: | |
| disam_data.add(i) | |
| for ids in disam_data: | |
| try: | |
| wikibase_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={ids}&prop=pageprops&format=json" | |
| json_qid = await fetch_json(wikibase_url, session) | |
| wikidata_qid = json_qid.get('query', {}).get('pages', {}) | |
| for page_id, page_data in wikidata_qid.items(): | |
| page_props = page_data.get('pageprops', {}) | |
| wikibase_item = page_props.get('wikibase_item', None) | |
| if wikibase_item: | |
| qids.add(wikibase_item) | |
| except: | |
| pass | |
| with open(f"/home/user/app/qids_folder/{name}.json", "w") as f: | |
| json.dump(list(qids), f) | |
| async def get_results(query): | |
| user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
| url = "https://query.wikidata.org/sparql" | |
| sparql = SPARQLWrapper(url, agent=user_agent) | |
| sparql.setQuery(query) | |
| sparql.setReturnFormat(JSON) | |
| return sparql.query().convert() | |
| def get_resultss(query): | |
| user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
| url = "https://query.wikidata.org/sparql" | |
| sparql = SPARQLWrapper(url, agent=user_agent) | |
| sparql.setQuery(query) | |
| sparql.setReturnFormat(JSON) | |
| return sparql.query().convert() | |
| def cleaner(text): | |
| text = text.replace('\\', '').replace('\n', ' ') | |
| text = re.sub(r'\{.*?\}', '', text) | |
| text = re.sub(' +', ' ', text).strip() | |
| return text | |
| async def retriever(qid): | |
| async with aiohttp.ClientSession() as session: | |
| list_with_sent = [] | |
| query_label = f"""SELECT ?subjectLabel | |
| WHERE {{ | |
| wd:{qid} rdfs:label ?subjectLabel . | |
| FILTER(LANG(?subjectLabel) = "en") | |
| }} | |
| """ | |
| results = await get_results(query_label) | |
| label = None | |
| if results["results"]["bindings"]: | |
| for result in results["results"]["bindings"]: | |
| for key, value in result.items(): | |
| label = value.get("value", {}).lower() | |
| query_alias = f"""SELECT ?alias | |
| WHERE {{ | |
| wd:{qid} skos:altLabel ?alias | |
| FILTER(LANG(?alias) = "en") | |
| }} | |
| """ | |
| alias_list = [] | |
| results = await get_results(query_alias) | |
| for result in results["results"]["bindings"]: | |
| for key, value in result.items(): | |
| alias = value.get("value", "None") | |
| alias_list.append(alias) | |
| query_desci = f"""SELECT ?subjectLabel | |
| WHERE {{ | |
| ?subjectLabel schema:about wd:{qid} ; | |
| schema:inLanguage "en" ; | |
| schema:isPartOf <https://en.wikipedia.org/> . | |
| }} | |
| """ | |
| results = await get_results(query_desci) | |
| cleaned_first_para = "None" | |
| if results["results"]["bindings"]: | |
| for result in results["results"]["bindings"]: | |
| for key, value in result.items(): | |
| desc = value.get("value", "None") | |
| title = desc.split("/wiki/")[1] | |
| url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles={title}&exintro=&exsentences=2&explaintext=&redirects=&formatversion=2&format=json" | |
| json_data = await fetch_json(url, session) | |
| cleaned_first_para = cleaner(json_data.get('query', {}).get('pages', [{}])[0].get('extract', 'None')) | |
| else: | |
| query_desc = f"""SELECT ?subjectLabel | |
| WHERE {{ | |
| wd:{qid} schema:description ?subjectLabel . | |
| FILTER(LANG(?subjectLabel) = "en") | |
| }} | |
| """ | |
| results = await get_results(query_desc) | |
| if results["results"]["bindings"]: | |
| for result in results["results"]["bindings"]: | |
| for key, value in result.items(): | |
| cleaned_first_para = value.get("value", "None") | |
| list_with_sent.append({"qid": qid, "label": label, "description": cleaned_first_para}) | |
| if alias_list: | |
| for alias in alias_list: | |
| list_with_sent.append({"qid": qid, "label": alias.lower(), "description": cleaned_first_para}) | |
| return list_with_sent | |
| async def main(name): | |
| with open(f"/home/user/app/qids_folder/{name}.json", "r") as f: | |
| final_list = [] | |
| qids = json.load(f) | |
| for q in qids: | |
| returned_list = await retriever(q) | |
| if returned_list: | |
| final_list.extend(returned_list) | |
| with open(f"/home/user/app/info_extraction/{name}.json", "w", encoding="utf-8") as flast: | |
| json.dump(final_list, flast) | |
| def main_cli(): | |
| st.title("✨ Entity Linking Application ✨") | |
| st.caption("This web application is part of my master’s dissertation.") | |
| if 'run_button' in st.session_state and st.session_state.run_button == True: | |
| st.session_state.running = True | |
| else: | |
| st.session_state.running = False | |
| api_token = st.text_input("Enter your API key from [GitHub](https://github.com/marketplace/models/azure-openai/gpt-4o):", "", type="password", disabled=st.session_state.running) | |
| if api_token: | |
| endpoint = "https://models.inference.ai.azure.com" | |
| model_name = "gpt-4o" | |
| client = OpenAI( | |
| base_url=endpoint, | |
| api_key=api_token, | |
| ) | |
| st.success("API Token is set for this session.") | |
| else: | |
| st.warning("Please enter an API token to proceed.") | |
| input_sentence_user = st.text_input("Enter a sentence:", "", disabled=st.session_state.running) | |
| input_mention_user = st.text_input("Enter a textural reference (mention) that is inside the sentence:", "", disabled=st.session_state.running) | |
| deep_search = st.selectbox("Perform deep search? (Useful for difficult mentions)", ['Yes', 'No'], index=1, disabled=st.session_state.running) | |
| if st.button("Run Entity Linking", key="run_button", disabled=st.session_state.running): | |
| if input_sentence_user and input_mention_user: | |
| # check if the mention is in the sentence | |
| if input_mention_user in input_sentence_user: | |
| with st.spinner("Applying Data Normalization module... (1/5)"): | |
| # Data Normalization | |
| start_time = time.time() | |
| list_with_full_names = [] | |
| list_with_names_to_show = [] | |
| response = client.chat.completions.create( | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": """ | |
| I will give you one or more labels within a sentence. Your task is as follows: | |
| Identify each label in the sentence, and check if it is an acronym. | |
| If the label is an acronym, respond with the full name of the acronym. | |
| If the label is not an acronym, respond with the label exactly as it was given to you. | |
| If a label contains multiple terms (e.g., 'phase and DIC microscopy'), treat each term within the label as a separate label. | |
| This means you should identify and explain each part of the label individually. | |
| Each part should be on its own line in the response. | |
| Context-Specific Terms: If the sentence context suggests a relevant term that applies to each label (such as "study" in 'morphological, sedimentological, and stratigraphical study'), add that term to each label’s explanation. | |
| Use context clues to determine the appropriate term to add (e.g., 'study' or 'microscopy'). | |
| Output Format: Your response should contain only the explanations, formatted as follows: | |
| Each label or part of a label should be on a new line. | |
| Do not include any additional text, and do not repeat the original sentence. | |
| Example 1: | |
| Input: | |
| label: phase and DIC microscopy | |
| context: Tardigrades have been extracted from samples using centrifugation with Ludox AM™ and mounted on individual microscope slides in Hoyer's medium for identification under phase and DIC microscopy. | |
| Expected response: | |
| phase: phase microscopy | |
| DIC microscopy: Differential interference contrast microscopy | |
| Example 2: | |
| Input: | |
| label: morphological, sedimentological, and stratigraphical study | |
| context: This paper presents results of a morphological, sedimentological, and stratigraphical study of relict beach ridges formed on a prograded coastal barrier in Bream Bay, North Island New Zealand. | |
| Expected response: | |
| morphological: morphological study | |
| sedimentological: sedimentological study | |
| stratigraphical: stratigraphical study | |
| IMPORTANT: | |
| Each label, even if nested within another, should be treated as an individual item. | |
| Each individual label or acronym should be output on a separate line. | |
| """ | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"label:{input_mention_user}, context:{input_sentence_user}" | |
| } | |
| ], | |
| temperature=1.0, | |
| top_p=1.0, | |
| max_tokens=1000, | |
| model=model_name | |
| ) | |
| kati = response.choices[0].message.content.splitlines() | |
| print(response.choices[0].message.content) | |
| for i in kati: | |
| context = i.split(":")[-1].strip() | |
| original_name = i.split(":")[0].strip() | |
| list_with_full_names.append(context) | |
| list_with_names_to_show.append(original_name) | |
| name = ",".join(list_with_full_names) | |
| input_sentence_user = input_sentence_user.replace(input_mention_user, name) # Changing the mention to the correct one | |
| response = client.chat.completions.create( | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "Given a label or labels within a sentence, provide a brief description (2-3 sentences) explaining what the label represents, similar to how a Wikipedia entry would. Format your response as follows: label: description. I want only the description of the label, not the role in the context. Include the label in the description as well. For example: Sentiment analysis: Sentiment analysis is the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.\nText analysis: Text mining, text data mining (TDM) or text analytics is the process of deriving high-quality information from text. It involves the discovery by computer of new, previously unknown information, by automatically extracting information from different written resources.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"label:{name}, context:{input_sentence_user}" | |
| } | |
| ], | |
| temperature=1.0, | |
| top_p=1.0, | |
| max_tokens=1000, | |
| model=model_name | |
| ) | |
| z = response.choices[0].message.content.splitlines() | |
| print(response.choices[0].message.content) | |
| list_with_contexts = [] | |
| for i in z: | |
| context = i.split(":")[-1].strip() | |
| list_with_contexts.append(context) | |
| st.write("✅ Applied Data Normilzation module (1/5)") | |
| # Candidate Retrieval & Information Gathering | |
| async def big_main(mention, deep_search): | |
| mention = mention.split(",") | |
| with st.spinner("Applying Candidate Retrieval module... (2/5)"): | |
| for i in mention: | |
| await mains(i, deep_search) | |
| st.write("✅ Applied Candidate Retrieval module (2/5)") | |
| with st.spinner("Applying Information Gathering module... (3/5)"): | |
| for i in mention: | |
| await main(i) | |
| st.write("✅ Applied Information Gathering module (3/5)") | |
| asyncio.run(big_main(name, deep_search)) | |
| number = 0 | |
| for i,j,o in zip(list_with_full_names,list_with_contexts,list_with_names_to_show): | |
| number += 1 | |
| with st.spinner(f"Applying Candidate Selection module... (4/5) [{number}/{len(list_with_full_names)}] (This may take a while due to limited resources)"): | |
| with open(f"/home/user/app/info_extraction/{i}.json", "r") as f: | |
| json_file = json.load(f) | |
| lista = [] | |
| lista_1 = [] | |
| my_bar = st.progress(0) | |
| for index, element in enumerate(json_file): | |
| qid = element.get("qid") | |
| link = f"https://www.wikidata.org/wiki/{qid}" | |
| label = element.get("label") | |
| description = element.get("description") | |
| label_emb = model.encode([label]) | |
| desc_emb = model.encode([description]) | |
| lista.append({link: [label_emb, desc_emb]}) | |
| my_bar.progress((index + 1) / len(json_file)) | |
| print(qid) | |
| label_dataset_emb = model.encode([i]) | |
| desc_dataset_emb = model.encode([j]) | |
| for emb in lista: | |
| for k, v in emb.items(): | |
| cossim_label = model.similarity(label_dataset_emb, v[0][0]) | |
| desc_label = model.similarity(desc_dataset_emb, v[1][0]) | |
| emb_mean = np.mean([cossim_label, desc_label]) | |
| lista_1.append({k: emb_mean}) | |
| sorted_data = sorted(lista_1, key=lambda x: list(x.values())[0], reverse=True) | |
| my_bar.empty() | |
| st.write(f"✅ Applined Candidate Selection module (4/5) [{number}/{len(list_with_full_names)}]") | |
| with st.spinner(f"Applying Candidate Matching module... (5/5) [{number}/{len(list_with_full_names)}]"): | |
| if sorted_data: | |
| sorted_top = sorted_data[0] | |
| for k, v in sorted_top.items(): | |
| qid = k.split("/")[-1] | |
| wikidata2wikipedia = f""" | |
| SELECT ?wikipedia | |
| WHERE {{ | |
| ?wikipedia schema:about wd:{qid} . | |
| ?wikipedia schema:isPartOf <https://en.wikipedia.org/> . | |
| }} | |
| """ | |
| results = get_resultss(wikidata2wikipedia) | |
| for result in results["results"]["bindings"]: | |
| for key, value in result.items(): | |
| wikipedia = value.get("value", "None") | |
| sparql = SPARQLWrapper("http://dbpedia.org/sparql") | |
| wikidata2dbpedia = f""" | |
| SELECT ?dbpedia | |
| WHERE {{ | |
| ?dbpedia owl:sameAs <http://www.wikidata.org/entity/{qid}>. | |
| }} | |
| """ | |
| sparql.setQuery(wikidata2dbpedia) | |
| sparql.setReturnFormat(JSON) | |
| results = sparql.query().convert() | |
| for result in results["results"]["bindings"]: | |
| dbpedia = result["dbpedia"]["value"] | |
| st.write(f"✅ Applied Candidate Matching module (5/5) [{number}/{len(list_with_full_names)}]") | |
| st.text(f"The correct entity for '{o}' is:") | |
| st.success(f"Wikipedia: {wikipedia}") | |
| st.success(f"Wikidata: {k}") | |
| st.success(f"DBpedia: {dbpedia}") | |
| else: | |
| st.warning(f"The entity: {o} is NIL.") | |
| else: | |
| st.warning(f"The mention '{input_mention_user}' was NOT found in the sentence.") | |
| else: | |
| st.warning("Please fill in both fields.") | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| ETA = time.strftime("%H:%M:%S", time.gmtime(execution_time)) | |
| st.write(f"⌛ Execution time: {ETA}") | |
| st.button("Rerun", disabled=False) | |
| # i think this part can be removed now | |
| folder_path = "qids_folder" | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| os.remove(file_path) | |
| folder_path_1 = "info_extraction" | |
| for filename in os.listdir(folder_path_1): | |
| file_path = os.path.join(folder_path_1, filename) | |
| os.remove(file_path) | |
| if __name__ == "__main__": | |
| main_cli() |