Spaces:
Paused
Paused
| import os | |
| import sqlite3 | |
| from whoosh import index | |
| from whoosh.fields import Schema, ID, TEXT | |
| def extract_contents_from_db(db_path, max_len=25): | |
| """ | |
| Extract all non-null, unique text values of length <= max_len | |
| from every table and column in the SQLite database. | |
| Returns: | |
| List of tuples [(doc_id, text), ...] | |
| """ | |
| conn = sqlite3.connect(db_path) | |
| cur = conn.cursor() | |
| docs = [] | |
| # Iterate over all user tables in the database | |
| for (table_name,) in cur.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table'" | |
| ): | |
| if table_name == "sqlite_sequence": | |
| continue | |
| # PRAGMA table_info returns rows like (cid, name, type, ...) | |
| # We want the column **name**, which is at index 1 | |
| cols = [r[1] for r in cur.execute(f"PRAGMA table_info('{table_name}')")] | |
| # Pull distinct non-null values from each column | |
| for col in cols: | |
| for (val,) in cur.execute( | |
| f"SELECT DISTINCT `{col}` FROM `{table_name}` WHERE `{col}` IS NOT NULL" | |
| ): | |
| text = str(val).strip() | |
| if 0 < len(text) <= max_len: | |
| # Generate a unique document ID | |
| doc_id = f"{table_name}-{col}-{hash(text)}" | |
| docs.append((doc_id, text)) | |
| conn.close() | |
| return docs | |
| def build_index_for_db(db_id, db_path, index_root="db_contents_index"): | |
| """ | |
| Build (or open) a Whoosh index for a single database. | |
| - If the index already exists in index_root/db_id, it will be opened. | |
| - Otherwise, a new index is created and populated from the SQLite file. | |
| """ | |
| index_dir = os.path.join(index_root, db_id) | |
| os.makedirs(index_dir, exist_ok=True) | |
| # Define the schema: unique ID + stored text field | |
| schema = Schema( | |
| id=ID(stored=True, unique=True), | |
| content=TEXT(stored=True) | |
| ) | |
| # Open existing index if present | |
| if index.exists_in(index_dir): | |
| return index.open_dir(index_dir) | |
| # Otherwise create a new index and add documents | |
| ix = index.create_in(index_dir, schema) | |
| writer = ix.writer() | |
| docs = extract_contents_from_db(db_path) | |
| for doc_id, text in docs: | |
| writer.add_document(id=doc_id, content=text) | |
| writer.commit() | |
| return ix | |
| if __name__ == "__main__": | |
| DATABASE_ROOT = "databases" | |
| INDEX_ROOT = "db_contents_index" | |
| # Optionally remove any existing index directory to start fresh | |
| if os.path.isdir(INDEX_ROOT): | |
| import shutil | |
| shutil.rmtree(INDEX_ROOT) | |
| os.makedirs(INDEX_ROOT, exist_ok=True) | |
| # Loop over each database folder in databases/ | |
| for db_id in os.listdir(DATABASE_ROOT): | |
| db_file = os.path.join(DATABASE_ROOT, db_id, f"{db_id}.sqlite") | |
| if os.path.isfile(db_file): | |
| print(f"Building Whoosh index for {db_id}...") | |
| build_index_for_db(db_id, db_file, INDEX_ROOT) | |
| print("All indexes built successfully.") |