|
|
import asyncio |
|
|
import hashlib |
|
|
import json |
|
|
import os |
|
|
import shutil |
|
|
import sys |
|
|
import tempfile |
|
|
import time |
|
|
from collections import defaultdict |
|
|
from concurrent.futures import ProcessPoolExecutor |
|
|
from glob import glob |
|
|
from itertools import product |
|
|
from time import sleep |
|
|
|
|
|
import aiohttp |
|
|
import jsonlines |
|
|
import PyPDF2 |
|
|
import requests |
|
|
from tqdm import tqdm |
|
|
|
|
|
from presentation import Presentation |
|
|
from utils import Config, tenacity |
|
|
|
|
|
topics = [ |
|
|
"culture", |
|
|
"education", |
|
|
"science", |
|
|
"society", |
|
|
"technology", |
|
|
] |
|
|
|
|
|
BANNED_LICENSES = [ |
|
|
"unknown", |
|
|
"cc-by-nc-3.0", |
|
|
"cc-by-nc-4.0", |
|
|
"cc-by-nc-nd-1.0", |
|
|
"cc-by-nc-nd-4.0", |
|
|
"cc-by-nd-1.0", |
|
|
"cc-by-nd-2.5", |
|
|
"cc-by-nd-4.0", |
|
|
"other-open", |
|
|
"other-at", |
|
|
"other-pd", |
|
|
"other-closed", |
|
|
] |
|
|
|
|
|
|
|
|
@tenacity |
|
|
def search_zenodo( |
|
|
sort: str, |
|
|
query: str = None, |
|
|
page: int = 1, |
|
|
filetype: str = "pptx", |
|
|
size: int = 500, |
|
|
) -> dict: |
|
|
url = "https://zenodo.org/api/records" |
|
|
params = { |
|
|
"size": size, |
|
|
"page": page, |
|
|
"access_right": "open", |
|
|
"file_type": filetype, |
|
|
"sort": sort, |
|
|
} |
|
|
if query is not None: |
|
|
params["q"] = query |
|
|
response = requests.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
|
|
|
|
|
|
@tenacity |
|
|
def iter_zenodo( |
|
|
sort: str, |
|
|
query: str = None, |
|
|
maxpage: int = None, |
|
|
filetype: str = "pptx", |
|
|
page: int = 1, |
|
|
): |
|
|
while True: |
|
|
if maxpage is not None and page > maxpage: |
|
|
break |
|
|
for record in search_zenodo(sort, query, page, filetype, 500)["hits"]["hits"]: |
|
|
yield record, page |
|
|
page += 1 |
|
|
if page > 20: |
|
|
break |
|
|
|
|
|
|
|
|
def ppt_validate(filename: str): |
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
config = Config(rundir=temp_dir, debug=False) |
|
|
try: |
|
|
presentation = Presentation.from_file(filename, config) |
|
|
except Exception as e: |
|
|
return False |
|
|
num_images = len(os.listdir(config.IMAGE_DIR)) |
|
|
if num_images > 3 * len(presentation) or num_images == 0: |
|
|
return False |
|
|
if len(presentation) < 8 or len(presentation) > 64: |
|
|
return False |
|
|
if len(presentation.error_history) > len(presentation) / 2: |
|
|
return False |
|
|
layout_count = defaultdict(int) |
|
|
|
|
|
for slide in presentation.slides: |
|
|
layout_count[slide.slide_layout_name] += 1 |
|
|
if sum(layout_count.values()) / len(layout_count) < 2: |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
def pdf_validate(filename: str): |
|
|
try: |
|
|
with open(filename, "rb") as f: |
|
|
num_pages = len(PyPDF2.PdfReader(f).pages) |
|
|
except: |
|
|
return False |
|
|
if num_pages > 16: |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
async def download_file( |
|
|
session: aiohttp.ClientSession, filepath: str, url: str, pbar: tqdm = None |
|
|
) -> None: |
|
|
os.makedirs(os.path.dirname(filepath), exist_ok=True) |
|
|
async with session.get( |
|
|
url, params={"access_token": os.environ["ZENODO_TOKEN"]} |
|
|
) as response: |
|
|
if response.status == 200: |
|
|
with open(filepath, "wb") as f: |
|
|
f.write(await response.read()) |
|
|
elif response.status != 404: |
|
|
raise Exception(f"Failed to download {filepath}: {response.status}") |
|
|
if pbar is not None: |
|
|
pbar.update(1) |
|
|
|
|
|
|
|
|
def collect_links(jsonl_file: str) -> None: |
|
|
page = 1 |
|
|
progress_bar = None |
|
|
with jsonlines.open(jsonl_file, mode="w") as writer: |
|
|
while progress_bar is None or progress_bar.n < progress_bar.total: |
|
|
try: |
|
|
results = search_zenodo(page=page, sort="-mostrecent") |
|
|
except Exception as e: |
|
|
tqdm.write(f"Error {e}, current page: {page}") |
|
|
continue |
|
|
if progress_bar is None: |
|
|
progress_bar = tqdm( |
|
|
initial=(page - 1) * 500, |
|
|
total=results["hits"]["total"], |
|
|
desc="Processing pages", |
|
|
) |
|
|
progress_bar.update(len(results["hits"]["hits"])) |
|
|
records = [] |
|
|
for record in results["hits"]["hits"]: |
|
|
for file in record["files"]: |
|
|
if not file["key"].endswith(".pptx"): |
|
|
continue |
|
|
license = record["metadata"].get("license", {"id": "unknown"})["id"] |
|
|
if license == "unknow": |
|
|
continue |
|
|
records.append( |
|
|
{ |
|
|
"filename": file["key"], |
|
|
"size": file["size"], |
|
|
"url": file["links"]["self"], |
|
|
"license": license, |
|
|
"title": record["title"], |
|
|
"created": record["created"], |
|
|
"updated": record["updated"], |
|
|
"doi": record.get("doi", "unknown"), |
|
|
"checksum": file["checksum"], |
|
|
} |
|
|
) |
|
|
writer.write_all(records) |
|
|
page += 1 |
|
|
|
|
|
|
|
|
async def download_links(jsonl_file: str) -> None: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
with jsonlines.open(jsonl_file) as reader: |
|
|
tasks = list(reader) |
|
|
progress_bar = tqdm(total=len(tasks), desc="Downloading files") |
|
|
task_iter = iter(tasks) |
|
|
coroutines = [] |
|
|
while True: |
|
|
while len(coroutines) < 80: |
|
|
task = next(task_iter, None) |
|
|
if task is None: |
|
|
break |
|
|
dirname = f"zenodo-pptx/pptx/{task['license']}/{task['created'][:4]}/" |
|
|
basename = f"{task['checksum'][4:]}-{task['filename']}" |
|
|
filepath = dirname + basename |
|
|
try: |
|
|
open("/tmp/" + basename, "wb").close() |
|
|
except: |
|
|
filepath = dirname + basename[:240] + ".pptx" |
|
|
if os.path.exists(filepath): |
|
|
progress_bar.update(1) |
|
|
continue |
|
|
coroutines.append( |
|
|
download_file(session, filepath, task["url"], progress_bar) |
|
|
) |
|
|
|
|
|
start_time = time.time() |
|
|
results = await asyncio.gather(*coroutines, return_exceptions=True) |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
tqdm.write(f"Error {result}") |
|
|
if len(coroutines) % 80 != 0: |
|
|
return |
|
|
coroutines.clear() |
|
|
elapsed_time = time.time() - start_time |
|
|
sleep(max(60 - elapsed_time, 0)) |
|
|
|
|
|
|
|
|
async def gather_files(topics: list[str], num_results: int) -> None: |
|
|
session = aiohttp.ClientSession() |
|
|
filetypes = ["pptx", "pdf"] |
|
|
progress_bar = tqdm( |
|
|
total=len(topics) * len(filetypes) * num_results, desc="Gathering files" |
|
|
) |
|
|
writer = jsonlines.open(f"data/datastats.jsonl", mode="a") |
|
|
existed = defaultdict(list) |
|
|
for i in jsonlines.open(f"data/datastats.jsonl"): |
|
|
existed[i["topic"] + i["filetype"]].append(i) |
|
|
for topic, filetype in product(topics, filetypes): |
|
|
selected = existed.get(topic + filetype, []) |
|
|
progress_bar.set_description(f"Gathering {topic} {filetype}") |
|
|
progress_bar.update(len(selected)) |
|
|
page = 1 if len(selected) == 0 else selected[-1]["page"] + 1 |
|
|
for record, page in iter_zenodo( |
|
|
query=topic, filetype=filetype, sort="mostviewed", page=page |
|
|
): |
|
|
if len(selected) >= num_results: |
|
|
break |
|
|
license = record["metadata"].get("license", {"id": "unknown"})["id"] |
|
|
if license in BANNED_LICENSES: |
|
|
continue |
|
|
for file in record["files"]: |
|
|
if not file["key"].endswith(f".{filetype}"): |
|
|
continue |
|
|
filepath = f"zenodo-pptx/{filetype}/{license}/{record['created'][:4]}/{file['checksum'][4:]}-{file['key']}" |
|
|
dst = f"data/{topic}/{filetype}/{file['key'].rsplit('.')[0]}/original.{filetype}" |
|
|
if os.path.exists(dst): |
|
|
continue |
|
|
os.makedirs(os.path.dirname(dst)) |
|
|
if os.path.exists(filepath): |
|
|
shutil.copy(filepath, dst) |
|
|
else: |
|
|
try: |
|
|
await download_file(session, dst, file["links"]["self"]) |
|
|
except: |
|
|
continue |
|
|
if (filetype == "pptx" and not ppt_validate(dst)) or ( |
|
|
filetype == "pdf" and not pdf_validate(dst) |
|
|
): |
|
|
shutil.rmtree(os.path.dirname(dst)) |
|
|
continue |
|
|
selected.append( |
|
|
{ |
|
|
"filename": file["key"], |
|
|
"size": file["size"], |
|
|
"url": file["links"]["self"], |
|
|
"license": license, |
|
|
"title": record["title"], |
|
|
"created": record["created"], |
|
|
"updated": record["updated"], |
|
|
"doi": record.get("doi", "unknown"), |
|
|
"checksum": file["checksum"], |
|
|
"page": page, |
|
|
"topic": topic, |
|
|
"filetype": filetype, |
|
|
} |
|
|
) |
|
|
writer.write(selected[-1]) |
|
|
progress_bar.update(1) |
|
|
progress_bar.update(num_results - len(selected)) |
|
|
|
|
|
|
|
|
def verify_md5(filepath): |
|
|
expected_md5 = os.path.basename(filepath).split("-")[0] |
|
|
hash_md5 = hashlib.md5() |
|
|
try: |
|
|
with open(filepath, "rb") as f: |
|
|
for chunk in iter(lambda: f.read(4096), b""): |
|
|
hash_md5.update(chunk) |
|
|
except Exception as e: |
|
|
print(f"Error processing file {filepath}: {e}") |
|
|
if hash_md5.hexdigest() != expected_md5: |
|
|
print("find incorrect file: ", filepath) |
|
|
return filepath |
|
|
return None |
|
|
|
|
|
|
|
|
def check_files_md5_parallel(directory: str, num_workers: int = 8): |
|
|
filepaths = [] |
|
|
for root, _, files in os.walk(directory): |
|
|
for file in files: |
|
|
if file.endswith(".pptx") or file.endswith(".pdf"): |
|
|
filepaths.append(os.path.join(root, file)) |
|
|
with ProcessPoolExecutor(max_workers=num_workers) as executor: |
|
|
result = list(executor.map(verify_md5, filepaths)) |
|
|
with jsonlines.open("incorrect_files.jsonl", mode="w") as writer: |
|
|
for i in result: |
|
|
if i is not None: |
|
|
writer.write(i) |
|
|
|
|
|
|
|
|
def dataset_stat(): |
|
|
pdf_stat = {} |
|
|
ppt_stat = {} |
|
|
tempdir = tempfile.TemporaryDirectory() |
|
|
config = Config() |
|
|
config.set_rundir(tempdir.name) |
|
|
for topic in topics: |
|
|
markdown_contents = { |
|
|
f: len(open(f, "r").read()) for f in glob(f"data/{topic}/pdf/*/*.md") |
|
|
} |
|
|
pdf_stat |= markdown_contents |
|
|
avg_pdf_text_len = sum(markdown_contents.values()) / len(markdown_contents) |
|
|
num_images = 0 |
|
|
for pdf_folder in glob(f"data/{topic}/pdf/*"): |
|
|
images = json.load(open(os.path.join(pdf_folder, "image_caption.json"))) |
|
|
num_images += len(images) |
|
|
avg_pdf_images = num_images / len(markdown_contents) |
|
|
ppt_text_len = 0 |
|
|
ppt_pages = 0 |
|
|
ppt_images = 0 |
|
|
num_ppts = 10 |
|
|
for ppt_folder in glob(f"data/{topic}/pptx/*"): |
|
|
presentation = Presentation.from_file( |
|
|
os.path.join(ppt_folder, "source.pptx"), config |
|
|
) |
|
|
ppt_stat[ppt_folder] = sum( |
|
|
[len(slide.to_text()) for slide in presentation.slides] |
|
|
) |
|
|
|
|
|
ppt_text_len += ppt_stat[ppt_folder] |
|
|
ppt_pages += len(presentation) |
|
|
ppt_images += len(os.listdir(os.path.join(ppt_folder, "images"))) |
|
|
|
|
|
avg_ppt_pages = ppt_pages / num_ppts |
|
|
avg_ppt_text_len = ppt_text_len / num_ppts |
|
|
avg_ppt_images = ppt_images / num_ppts |
|
|
print( |
|
|
"topic", |
|
|
"avg_pdf_text_len", |
|
|
"avg_pdf_images", |
|
|
"avg_ppt_pages", |
|
|
"avg_ppt_images", |
|
|
"avg_ppt_text_len", |
|
|
) |
|
|
print( |
|
|
f"{topic}: {avg_pdf_text_len:.2f}, {avg_pdf_images:.2f}, {avg_ppt_pages:.2f}, {avg_ppt_images:.2f}, {avg_ppt_text_len:.2f}" |
|
|
) |
|
|
|
|
|
json.dump( |
|
|
{"pdf": pdf_stat, "ppt": ppt_stat}, open("data/eval/stat.json", "w"), indent=4 |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
jsonl = "zenodo-pptx/filestats.jsonl" |
|
|
if sys.argv[1] == "collect": |
|
|
collect_links(jsonl) |
|
|
elif sys.argv[1] == "download": |
|
|
asyncio.run(download_links(jsonl)) |
|
|
elif sys.argv[1] == "gather": |
|
|
asyncio.run(gather_files(topics, 100)) |
|
|
elif sys.argv[1] == "check": |
|
|
check_files_md5_parallel("zenodo-pptx", int(sys.argv[2])) |
|
|
|