Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import logging | |
| from pathlib import Path | |
| import aiosqlite | |
| from typing import Optional | |
| import xxhash | |
| import aiofiles | |
| import shutil | |
| import time | |
| from datetime import datetime | |
| from .async_logger import AsyncLogger, LogLevel | |
| # Initialize logger | |
| logger = AsyncLogger(log_level=LogLevel.DEBUG, verbose=True) | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| class DatabaseMigration: | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| self.content_paths = self._ensure_content_dirs(os.path.dirname(db_path)) | |
| def _ensure_content_dirs(self, base_path: str) -> dict: | |
| dirs = { | |
| 'html': 'html_content', | |
| 'cleaned': 'cleaned_html', | |
| 'markdown': 'markdown_content', | |
| 'extracted': 'extracted_content', | |
| 'screenshots': 'screenshots' | |
| } | |
| content_paths = {} | |
| for key, dirname in dirs.items(): | |
| path = os.path.join(base_path, dirname) | |
| os.makedirs(path, exist_ok=True) | |
| content_paths[key] = path | |
| return content_paths | |
| def _generate_content_hash(self, content: str) -> str: | |
| x = xxhash.xxh64() | |
| x.update(content.encode()) | |
| content_hash = x.hexdigest() | |
| return content_hash | |
| # return hashlib.sha256(content.encode()).hexdigest() | |
| async def _store_content(self, content: str, content_type: str) -> str: | |
| if not content: | |
| return "" | |
| content_hash = self._generate_content_hash(content) | |
| file_path = os.path.join(self.content_paths[content_type], content_hash) | |
| if not os.path.exists(file_path): | |
| async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: | |
| await f.write(content) | |
| return content_hash | |
| async def migrate_database(self): | |
| """Migrate existing database to file-based storage""" | |
| # logger.info("Starting database migration...") | |
| logger.info("Starting database migration...", tag="INIT") | |
| try: | |
| async with aiosqlite.connect(self.db_path) as db: | |
| # Get all rows | |
| async with db.execute( | |
| '''SELECT url, html, cleaned_html, markdown, | |
| extracted_content, screenshot FROM crawled_data''' | |
| ) as cursor: | |
| rows = await cursor.fetchall() | |
| migrated_count = 0 | |
| for row in rows: | |
| url, html, cleaned_html, markdown, extracted_content, screenshot = row | |
| # Store content in files and get hashes | |
| html_hash = await self._store_content(html, 'html') | |
| cleaned_hash = await self._store_content(cleaned_html, 'cleaned') | |
| markdown_hash = await self._store_content(markdown, 'markdown') | |
| extracted_hash = await self._store_content(extracted_content, 'extracted') | |
| screenshot_hash = await self._store_content(screenshot, 'screenshots') | |
| # Update database with hashes | |
| await db.execute(''' | |
| UPDATE crawled_data | |
| SET html = ?, | |
| cleaned_html = ?, | |
| markdown = ?, | |
| extracted_content = ?, | |
| screenshot = ? | |
| WHERE url = ? | |
| ''', (html_hash, cleaned_hash, markdown_hash, | |
| extracted_hash, screenshot_hash, url)) | |
| migrated_count += 1 | |
| if migrated_count % 100 == 0: | |
| logger.info(f"Migrated {migrated_count} records...", tag="INIT") | |
| await db.commit() | |
| logger.success(f"Migration completed. {migrated_count} records processed.", tag="COMPLETE") | |
| except Exception as e: | |
| # logger.error(f"Migration failed: {e}") | |
| logger.error( | |
| message="Migration failed: {error}", | |
| tag="ERROR", | |
| params={"error": str(e)} | |
| ) | |
| raise e | |
| async def backup_database(db_path: str) -> str: | |
| """Create backup of existing database""" | |
| if not os.path.exists(db_path): | |
| logger.info("No existing database found. Skipping backup.", tag="INIT") | |
| return None | |
| # Create backup with timestamp | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| backup_path = f"{db_path}.backup_{timestamp}" | |
| try: | |
| # Wait for any potential write operations to finish | |
| await asyncio.sleep(1) | |
| # Create backup | |
| shutil.copy2(db_path, backup_path) | |
| logger.info(f"Database backup created at: {backup_path}", tag="COMPLETE") | |
| return backup_path | |
| except Exception as e: | |
| # logger.error(f"Backup failed: {e}") | |
| logger.error( | |
| message="Migration failed: {error}", | |
| tag="ERROR", | |
| params={"error": str(e)} | |
| ) | |
| raise e | |
| async def run_migration(db_path: Optional[str] = None): | |
| """Run database migration""" | |
| if db_path is None: | |
| db_path = os.path.join(Path.home(), ".crawl4ai", "crawl4ai.db") | |
| if not os.path.exists(db_path): | |
| logger.info("No existing database found. Skipping migration.", tag="INIT") | |
| return | |
| # Create backup first | |
| backup_path = await backup_database(db_path) | |
| if not backup_path: | |
| return | |
| migration = DatabaseMigration(db_path) | |
| await migration.migrate_database() | |
| def main(): | |
| """CLI entry point for migration""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Migrate Crawl4AI database to file-based storage') | |
| parser.add_argument('--db-path', help='Custom database path') | |
| args = parser.parse_args() | |
| asyncio.run(run_migration(args.db_path)) | |
| if __name__ == "__main__": | |
| main() |