Spaces:
Build error
Build error
| import requests | |
| from bs4 import BeautifulSoup | |
| from tqdm import tqdm | |
| from urllib.parse import urlparse | |
| import chainlit as cl | |
| """ | |
| Ref: https://python.plainenglish.io/scraping-the-subpages-on-a-website-ea2d4e3db113 | |
| """ | |
| class WebpageCrawler: | |
| def __init__(self): | |
| pass | |
| def getdata(self, url): | |
| r = requests.get(url) | |
| return r.text | |
| def url_exists(self, url): | |
| try: | |
| response = requests.head(url) | |
| return response.status_code == 200 | |
| except requests.ConnectionError: | |
| return False | |
| def get_links(self, website_link, base_url=None): | |
| if base_url is None: | |
| base_url = website_link | |
| html_data = self.getdata(website_link) | |
| soup = BeautifulSoup(html_data, "html.parser") | |
| list_links = [] | |
| for link in soup.find_all("a", href=True): | |
| # Append to list if new link contains original link | |
| if str(link["href"]).startswith((str(website_link))): | |
| list_links.append(link["href"]) | |
| # Include all href that do not start with website link but with "/" | |
| if str(link["href"]).startswith("/"): | |
| if link["href"] not in self.dict_href_links: | |
| print(link["href"]) | |
| self.dict_href_links[link["href"]] = None | |
| link_with_www = base_url + link["href"][1:] | |
| if self.url_exists(link_with_www): | |
| print("adjusted link =", link_with_www) | |
| list_links.append(link_with_www) | |
| # Convert list of links to dictionary and define keys as the links and the values as "Not-checked" | |
| dict_links = dict.fromkeys(list_links, "Not-checked") | |
| return dict_links | |
| def get_subpage_links(self, l, base_url): | |
| for link in tqdm(l): | |
| # If not crawled through this page start crawling and get links | |
| if l[link] == "Not-checked": | |
| dict_links_subpages = self.get_links(link, base_url) | |
| # Change the dictionary value of the link to "Checked" | |
| l[link] = "Checked" | |
| else: | |
| # Create an empty dictionary in case every link is checked | |
| dict_links_subpages = {} | |
| # Add new dictionary to old dictionary | |
| l = {**dict_links_subpages, **l} | |
| return l | |
| def get_all_pages(self, url, base_url): | |
| dict_links = {url: "Not-checked"} | |
| self.dict_href_links = {} | |
| counter, counter2 = None, 0 | |
| while counter != 0: | |
| counter2 += 1 | |
| dict_links2 = self.get_subpage_links(dict_links, base_url) | |
| # Count number of non-values and set counter to 0 if there are no values within the dictionary equal to the string "Not-checked" | |
| # https://stackoverflow.com/questions/48371856/count-the-number-of-occurrences-of-a-certain-value-in-a-dictionary-in-python | |
| counter = sum(value == "Not-checked" for value in dict_links2.values()) | |
| dict_links = dict_links2 | |
| checked_urls = [ | |
| url for url, status in dict_links.items() if status == "Checked" | |
| ] | |
| return checked_urls | |
| def get_urls_from_file(file_path: str): | |
| """ | |
| Function to get urls from a file | |
| """ | |
| with open(file_path, "r") as f: | |
| urls = f.readlines() | |
| urls = [url.strip() for url in urls] | |
| return urls | |
| def get_base_url(url): | |
| parsed_url = urlparse(url) | |
| base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" | |
| return base_url | |
| def get_sources(res, answer): | |
| source_elements_dict = {} | |
| source_elements = [] | |
| found_sources = [] | |
| source_dict = {} # Dictionary to store URL elements | |
| for idx, source in enumerate(res["source_documents"]): | |
| source_metadata = source.metadata | |
| url = source_metadata["source"] | |
| if url not in source_dict: | |
| source_dict[url] = [source.page_content] | |
| else: | |
| source_dict[url].append(source.page_content) | |
| for source_idx, (url, text_list) in enumerate(source_dict.items()): | |
| full_text = "" | |
| for url_idx, text in enumerate(text_list): | |
| full_text += f"Source {url_idx+1}:\n {text}\n\n\n" | |
| source_elements.append(cl.Text(name=url, content=full_text)) | |
| found_sources.append(url) | |
| if found_sources: | |
| answer += f"\n\nSources: {', '.join(found_sources)} " | |
| else: | |
| answer += f"\n\nNo source found." | |
| # for idx, source in enumerate(res["source_documents"]): | |
| # title = source.metadata["source"] | |
| # if title not in source_elements_dict: | |
| # source_elements_dict[title] = { | |
| # "page_number": [source.metadata["page"]], | |
| # "url": source.metadata["source"], | |
| # "content": source.page_content, | |
| # } | |
| # else: | |
| # source_elements_dict[title]["page_number"].append(source.metadata["page"]) | |
| # source_elements_dict[title][ | |
| # "content_" + str(source.metadata["page"]) | |
| # ] = source.page_content | |
| # # sort the page numbers | |
| # # source_elements_dict[title]["page_number"].sort() | |
| # for title, source in source_elements_dict.items(): | |
| # # create a string for the page numbers | |
| # page_numbers = ", ".join([str(x) for x in source["page_number"]]) | |
| # text_for_source = f"Page Number(s): {page_numbers}\nURL: {source['url']}" | |
| # source_elements.append(cl.Pdf(name="File", path=title)) | |
| # found_sources.append("File") | |
| # # for pn in source["page_number"]: | |
| # # source_elements.append( | |
| # # cl.Text(name=str(pn), content=source["content_"+str(pn)]) | |
| # # ) | |
| # # found_sources.append(str(pn)) | |
| # if found_sources: | |
| # answer += f"\nSource:{', '.join(found_sources)}" | |
| # else: | |
| # answer += f"\nNo source found." | |
| return answer, source_elements | |