Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import shutil | |
| import time | |
| import dotenv | |
| import fitz # PyMuPDF | |
| import pandas as pd | |
| import requests | |
| import schedule | |
| import srsly | |
| from bs4 import BeautifulSoup | |
| from datasets import Dataset, Image, concatenate_datasets, load_dataset | |
| from huggingface_hub import HfApi, create_repo, login, whoami | |
| from PIL import Image as PILImage | |
| from retry import retry | |
| from tqdm.auto import tqdm | |
| dotenv.load_dotenv() | |
| login(token=os.environ.get("HF_TOKEN")) | |
| api = HfApi() | |
| hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] | |
| HF_REPO_ID_TXT = f"{hf_user}/zotero-answer-ai-texts" | |
| HF_REPO_ID_IMG = f"{hf_user}/zotero-answer-ai-images" | |
| ######################################################## | |
| ### GET ZOTERO ITEMS | |
| ######################################################## | |
| def _fetch_one_zotero_batch(url, headers, params): | |
| """ | |
| Fetch articles from Zotero API | |
| """ | |
| response = requests.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_zotero_items(debug=False): | |
| """ | |
| fetch items from zotero library | |
| """ | |
| GROUP_ID = os.getenv("GROUP_ID") | |
| API_KEY = os.getenv("API_KEY") | |
| BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items" | |
| LIMIT = 100 | |
| headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"} | |
| items = [] | |
| start = 0 | |
| i = 1 | |
| while True: | |
| i += 1 | |
| params = {"limit": LIMIT, "start": start} | |
| page_items = _fetch_one_zotero_batch(BASE_URL, headers, params) | |
| if not page_items: | |
| break | |
| items.extend(page_items) | |
| start += LIMIT | |
| print(f"# items fetched {len(items)}") | |
| if debug: | |
| if len(items) > 1600: | |
| break | |
| return items | |
| ######################################################## | |
| ### EXTRACT ARXIV LINKS AND PDFs | |
| ######################################################## | |
| def get_arxiv_items(items): | |
| visited = set() | |
| arxiv_items = [] | |
| arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)") | |
| for item in items: | |
| data = item.get("data", {}) | |
| attachments = item.get("links", {}).get("attachment", {}) | |
| arxiv_url = None | |
| pdf_url = None | |
| if "url" in data and "arxiv.org" in data["url"]: | |
| arxiv_match = arxiv_pattern.search(data["url"]) | |
| if arxiv_match: | |
| arxiv_url = data["url"] | |
| if attachments: | |
| pdf_url = attachments["href"] | |
| if arxiv_url: | |
| arxiv_id = arxiv_url.split("/")[-1] | |
| if arxiv_id in visited: | |
| continue | |
| authors = [] | |
| for author in data.get("creators", []): | |
| authors.append(f"{author.get('firstName', '')} {author.get('lastName', '')}") | |
| arxiv_items.append( | |
| { | |
| "arxiv_id": arxiv_id, | |
| "arxiv_url": arxiv_url, | |
| "title": data.get("title", ""), | |
| "authors": authors, | |
| "pdf_url": pdf_url, | |
| "date_published": data.get("date", ""), | |
| "added_by": item["meta"]["createdByUser"]["username"], | |
| "date_added": data.get("dateAdded", ""), | |
| } | |
| ) | |
| visited.add(arxiv_id) | |
| return arxiv_items | |
| def fetch_arxiv_html(arxiv_id): | |
| url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}" | |
| response = requests.get(url) | |
| return response.text if response.status_code == 200 else None | |
| def fetch_arxiv_htmls(arxiv_items): | |
| for item in tqdm(arxiv_items): | |
| html = fetch_arxiv_html(item["arxiv_id"]) | |
| if html: | |
| item["raw_content"] = html | |
| else: | |
| print(f"failed to fetch html for {item['arxiv_id']}") | |
| item["raw_content"] = "Error" | |
| return arxiv_items | |
| ######################################################## | |
| ### PARSE CONTENT FROM ARXIV HTML # | |
| ######################################################## | |
| def parse_html_content(html): | |
| """ | |
| Parse content from arxiv html | |
| """ | |
| arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html) | |
| arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None | |
| soup = BeautifulSoup(html, "html.parser") | |
| result = [] | |
| # Extract paper title | |
| try: | |
| paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True) | |
| except Exception: | |
| paper_title = soup.find("title").get_text(strip=True) | |
| paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title) | |
| for math in soup.find_all("math"): | |
| math.decompose() | |
| for cite in soup.find_all("cite"): | |
| cite.decompose() | |
| # Extract abstract | |
| abstract = soup.find("div", class_="ltx_abstract") | |
| if abstract: | |
| result.append( | |
| { | |
| "content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "), | |
| "title": "Abstract", | |
| "paper_title": paper_title, | |
| "content_type": "abstract", | |
| } | |
| ) | |
| # Extract sections | |
| sections = soup.find_all("section", class_="ltx_section") | |
| for index, section in enumerate(sections): | |
| section_title = section.find("h2", class_="ltx_title ltx_title_section") | |
| section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}" | |
| section_content = section.get_text(strip=True).replace(")", ") ") | |
| content_type = "body" | |
| if index == 0: | |
| content_type = "introduction" | |
| elif index == len(sections) - 1: | |
| content_type = "conclusion" | |
| result.append( | |
| { | |
| "content": section_content, | |
| "title": section_title, | |
| "paper_title": paper_title, | |
| "content_type": content_type, | |
| } | |
| ) | |
| for c in result: | |
| c["arxiv_id"] = arxiv_id | |
| return result | |
| ######################################################## | |
| ### GET TEXTS FROM PDF & PARSE | |
| ######################################################## | |
| def get_pdf_text(arxiv_id): | |
| url = "http://147.189.194.113:80/extract" # fix: currently down | |
| try: | |
| response = requests.get(url, params={"arxiv_id": arxiv_id}) | |
| response = response.json() | |
| if "text" in response: | |
| return response["text"] | |
| return None | |
| except Exception as e: | |
| print(e) | |
| return None | |
| def get_content_type(section_type, section_count): | |
| """Determine the content type based on the section type and count""" | |
| if section_type == "abstract": | |
| return "abstract" | |
| elif section_type == "introduction" or section_count == 1: | |
| return "introduction" | |
| elif section_type == "conclusion" or section_type == "references": | |
| return section_type | |
| else: | |
| return "body" | |
| def get_section_type(title): | |
| """Determine the section type based on the title""" | |
| title_lower = title.lower() | |
| if "abstract" in title_lower: | |
| return "abstract" | |
| elif "introduction" in title_lower: | |
| return "introduction" | |
| elif "conclusion" in title_lower: | |
| return "conclusion" | |
| elif "reference" in title_lower: | |
| return "references" | |
| else: | |
| return "body" | |
| def parse_markdown_content(md_content, arxiv_id): | |
| """ | |
| Parses markdown content to identify and extract sections based on headers. | |
| """ | |
| lines = md_content.split("\n") | |
| parsed = [] | |
| current_section = None | |
| content = [] | |
| paper_title = None | |
| current_title = None | |
| # identify sections based on headers | |
| for line in lines: | |
| if line.startswith("#"): | |
| if paper_title is None: | |
| paper_title = line.lstrip("#").strip() | |
| continue | |
| if content: | |
| if current_title: | |
| parsed.append( | |
| { | |
| "content": " ".join(content), | |
| "title": current_title, | |
| "paper_title": paper_title, | |
| "content_type": get_content_type(current_section, len(parsed)), | |
| "arxiv_id": arxiv_id, | |
| } | |
| ) | |
| content = [] | |
| current_title = line.lstrip("#").lstrip("#").lstrip() | |
| if "bit" not in current_title: | |
| current_title = ( | |
| current_title.lstrip("123456789") | |
| .lstrip() | |
| .lstrip(".") | |
| .lstrip() | |
| .lstrip("123456789") | |
| .lstrip() | |
| .lstrip(".") | |
| .lstrip() | |
| ) | |
| current_section = get_section_type(current_title) | |
| else: | |
| content.append(line) | |
| # Add the last section | |
| if content and current_title: | |
| parsed.append( | |
| { | |
| "content": " ".join(content).replace(")", ") "), | |
| "title": current_title, | |
| "paper_title": paper_title, | |
| "content_type": get_content_type(current_section, len(parsed)), | |
| "arxiv_id": arxiv_id, | |
| } | |
| ) | |
| return parsed | |
| ######################################################## | |
| ### Image Dataset | |
| ######################################################## | |
| def download_arxiv_pdf(arxiv_id): | |
| arxiv_id = arxiv_id.split("v")[0] | |
| url = f"https://arxiv.org/pdf/{arxiv_id}.pdf" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| return response.content | |
| else: | |
| raise Exception(f"Failed to download PDF. Status code: {response.status_code}") | |
| def pdf_to_jpegs(pdf_content, output_folder, max_pages=128): | |
| # Create output folder if it doesn't exist | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Open the PDF | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| # Iterate through pages | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| # Convert page to image | |
| pix = page.get_pixmap() | |
| # Save image as JPEG | |
| image_path = os.path.join(output_folder, f"page_{page_num + 1}.jpg") | |
| pix.save(image_path) | |
| # print(f"Saved {image_path}") | |
| if page_num >= max_pages: | |
| break | |
| doc.close() | |
| def save_arxiv_article_images(arxiv_id): | |
| output_folder = os.path.join("data", "arxiv_images", arxiv_id) | |
| try: | |
| pdf_content = download_arxiv_pdf(arxiv_id) | |
| pdf_to_jpegs(pdf_content, output_folder) | |
| except Exception as e: | |
| print(f"An error occurred: {str(e)}") | |
| def create_hf_image_dataset(base_dir): | |
| data = [] | |
| # Walk through the directory | |
| for root, dirs, files in os.walk(base_dir): | |
| for file in files: | |
| if file.endswith(".jpg"): | |
| # Extract arxiv_id from the path | |
| arxiv_id = os.path.basename(root) | |
| # Extract page number from the filename | |
| match = re.search(r"page_(\d+)", file) | |
| if match: | |
| page_number = int(match.group(1)) | |
| else: | |
| continue # Skip if page number can't be extracted | |
| # Full path to the image | |
| image_path = os.path.join(root, file) | |
| # Open the image to get its size | |
| with PILImage.open(image_path) as img: | |
| width, height = img.size | |
| # Add the data | |
| data.append( | |
| {"image": image_path, "arxiv_id": arxiv_id, "page_number": page_number, "width": width, "height": height} | |
| ) | |
| # Create the dataset | |
| dataset = Dataset.from_dict( | |
| { | |
| "image": [d["image"] for d in data], | |
| "arxiv_id": [d["arxiv_id"] for d in data], | |
| "page_number": [d["page_number"] for d in data], | |
| } | |
| ) | |
| # Cast the image column to Image | |
| dataset = dataset.cast_column("image", Image()) | |
| return dataset | |
| ######################################################## | |
| ### HF UPLOAD | |
| ######################################################## | |
| def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids): | |
| # repo_id = HF_REPO_ID | |
| create_repo( | |
| repo_id=HF_REPO_ID_TXT, | |
| token=os.environ.get("HF_TOKEN"), | |
| private=True, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| ) | |
| create_repo( | |
| repo_id=HF_REPO_ID_IMG, | |
| token=os.environ.get("HF_TOKEN"), | |
| private=True, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| ) | |
| # upload image dataset | |
| try: | |
| img_ds = create_hf_image_dataset("data/arxiv_images") | |
| try: | |
| old_img_ds = load_dataset(HF_REPO_ID_IMG, "images")["train"] | |
| img_ds = concatenate_datasets([old_img_ds, img_ds]) | |
| except Exception as e: | |
| print(e) | |
| img_ds.push_to_hub(HF_REPO_ID_IMG, "images", token=os.environ.get("HF_TOKEN")) | |
| except Exception as e: | |
| print(e) | |
| # upload first pages only | |
| try: | |
| img_ds = img_ds.filter(lambda x: x["page_number"] == 1) | |
| img_ds.push_to_hub(HF_REPO_ID_IMG, "images_first_page", token=os.environ.get("HF_TOKEN")) | |
| except Exception as e: | |
| print(e) | |
| try: | |
| # push id_to_abstract | |
| abstract_ds = Dataset.from_pandas(abstract_df) | |
| abstract_ds.push_to_hub(HF_REPO_ID_TXT, "abstracts", token=os.environ.get("HF_TOKEN")) | |
| # push arxiv_items | |
| arxiv_ds = Dataset.from_pandas(contents_df) | |
| arxiv_ds.push_to_hub(HF_REPO_ID_TXT, "articles", token=os.environ.get("HF_TOKEN")) | |
| # push processed_arxiv_ids | |
| processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids] | |
| processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids) | |
| processed_arxiv_ids_ds.push_to_hub(HF_REPO_ID_TXT, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN")) | |
| except Exception as e: | |
| print(e) | |
| # trigger refresh of connected datasets | |
| print("==" * 40) | |
| print("Triggering refresh of connected datasets") | |
| api.restart_space(repo_id="answerdotai/zotero-weekly") | |
| print("==" * 40) | |
| ######################################################## | |
| ### MAIN | |
| ######################################################## | |
| def main(): | |
| # items = get_zotero_items(debug=True) | |
| items = get_zotero_items(debug=False) | |
| print(f"# of items fetched from zotero: {len(items)}") | |
| arxiv_items = get_arxiv_items(items) | |
| print(f"# of arxiv papers: {len(arxiv_items)}") | |
| # get already processed arxiv ids from HF | |
| try: | |
| existing_arxiv_ids = load_dataset(HF_REPO_ID_TXT, "processed_arxiv_ids")["train"]["arxiv_id"] | |
| except Exception as e: | |
| print(e) | |
| existing_arxiv_ids = [] | |
| existing_arxiv_ids = set(existing_arxiv_ids) | |
| print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}") | |
| # new arxiv items | |
| arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids] | |
| arxiv_items = fetch_arxiv_htmls(arxiv_items) | |
| print(f"# of new arxiv items: {len(arxiv_items)}") | |
| if len(arxiv_items) == 0: | |
| print("No new arxiv items to process") | |
| return | |
| processed_arxiv_ids = set() | |
| pbar = tqdm(range(len(arxiv_items))) | |
| # remove "data" directory if it exists | |
| if os.path.exists("data"): | |
| try: | |
| shutil.rmtree("data") | |
| except Exception as e: | |
| print(e) | |
| for item in arxiv_items: | |
| # download images -- | |
| save_arxiv_article_images(item["arxiv_id"]) | |
| # parse html | |
| try: | |
| item["contents"] = parse_html_content(item["raw_content"]) | |
| except Exception as e: | |
| print(f"Failed to parse html for {item['arxiv_id']}: {e}") | |
| item["contents"] = [] | |
| if len(item["contents"]) == 0: | |
| print("Extracting from pdf...") | |
| md_content = get_pdf_text(item["arxiv_id"]) # fix this | |
| item["raw_content"] = md_content | |
| if md_content: | |
| item["contents"] = parse_markdown_content(md_content, item["arxiv_id"]) | |
| else: | |
| item["contents"] = [] | |
| if len(item["contents"]) > 0: | |
| processed_arxiv_ids.add(item["arxiv_id"]) | |
| if len(item["authors"]) == 0: | |
| item["authors"] = [] # ["unknown"] | |
| item["title"] = item["contents"][0]["paper_title"] | |
| pbar.update(1) | |
| pbar.close() | |
| # save contents --- | |
| processed_arxiv_ids = list(processed_arxiv_ids) | |
| print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}") | |
| # save abstracts --- | |
| id_to_abstract = {} | |
| for item in arxiv_items: | |
| for entry in item["contents"]: | |
| if entry["content_type"] == "abstract": | |
| id_to_abstract[item["arxiv_id"]] = entry["content"] | |
| break | |
| print(f"# of abstracts: {len(id_to_abstract)}") | |
| abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"}) | |
| print(abstract_df.head()) | |
| # add to existing dataset | |
| try: | |
| old_abstract_df = load_dataset(HF_REPO_ID_TXT, "abstracts")["train"].to_pandas() | |
| except Exception as e: | |
| print(e) | |
| old_abstract_df = pd.DataFrame(columns=abstract_df.columns) | |
| print(old_abstract_df.head()) | |
| abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True) | |
| abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
| # contents | |
| contents_df = pd.DataFrame(arxiv_items) | |
| print(contents_df.head()) | |
| try: | |
| old_contents_df = load_dataset(HF_REPO_ID_TXT, "articles")["train"].to_pandas() | |
| except Exception as e: | |
| print(e) | |
| old_contents_df = pd.DataFrame(columns=contents_df.columns) | |
| if len(old_contents_df) > 0: | |
| print(old_contents_df.sample().T) | |
| contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True) | |
| contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
| # upload to hf | |
| processed_arxiv_ids = list(set(processed_arxiv_ids + list(existing_arxiv_ids))) | |
| upload_to_hf(abstract_df, contents_df, processed_arxiv_ids) | |
| # save as local copy | |
| os.makedirs("data", exist_ok=True) | |
| abstract_df.to_parquet("data/abstracts.parquet") | |
| contents_df.to_parquet("data/contents.parquet") | |
| srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids) | |
| def schedule_periodic_task(): | |
| """ | |
| Schedule the main task to run at the user-defined frequency | |
| """ | |
| # main() # run once initially | |
| frequency = "daily" # TODO: env | |
| if frequency == "hourly": | |
| print("Scheduling tasks to run every hour at the top of the hour") | |
| schedule.every().hour.at(":00").do(main) | |
| elif frequency == "daily": | |
| start_time = "10:00" | |
| print("Scheduling tasks to run every day at: {start_time} UTC+00") | |
| schedule.every().day.at(start_time).do(main) | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(1) | |
| if __name__ == "__main__": | |
| schedule_periodic_task() | |