Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| import base64 | |
| import secrets | |
| from dataclasses import dataclass, asdict | |
| from typing import List, Optional, Dict, Any | |
| import streamlit as st | |
| import pandas as pd # for Excel/CSV import | |
| # ---- Crypto deps (install: pip install cryptography argon2-cffi==23.1.0) ---- | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |
| from cryptography.hazmat.primitives.hmac import HMAC | |
| from cryptography.hazmat.backends import default_backend | |
| from argon2.low_level import hash_secret_raw, Type | |
| APP_TITLE = "SmartPass — Local, Zero‑Knowledge Password Manager" | |
| VERSION = 1 | |
| # -------------------------- Utility helpers -------------------------- | |
| def b64e(b: bytes) -> str: | |
| return base64.b64encode(b).decode("utf-8") | |
| def b64d(s: str) -> bytes: | |
| return base64.b64decode(s.encode("utf-8")) | |
| def hkdf_expand(key_material: bytes, info: bytes, length: int = 32, salt: Optional[bytes] = None) -> bytes: | |
| hkdf = HKDF( | |
| algorithm=hashes.SHA256(), | |
| length=length, | |
| salt=salt, | |
| info=info, | |
| backend=default_backend(), | |
| ) | |
| return hkdf.derive(key_material) | |
| # -------------------------- KDF & Keys -------------------------- | |
| class KDFParams: | |
| algo: str = "argon2id" | |
| salt_b64: str = "" | |
| m_cost_kib: int = 131072 # 128 MiB | |
| t_cost: int = 3 | |
| parallelism: int = 1 | |
| hash_len: int = 32 | |
| def to_dict(self): | |
| d = asdict(self) | |
| return d | |
| class WrappedDataKey: | |
| nonce_b64: str | |
| ct_b64: str | |
| def to_dict(self): | |
| return asdict(self) | |
| class VaultItem: | |
| id: str | |
| type: str # "login" | "note" | |
| enc_blob_b64: str | |
| nonce_b64: str | |
| created_at: float | |
| updated_at: float | |
| def to_dict(self): | |
| return asdict(self) | |
| class Vault: | |
| version: int | |
| kdf: KDFParams | |
| data_key_wrapped: WrappedDataKey | |
| items: List[VaultItem] | |
| integrity_hmac_b64: str | |
| def to_dict(self): | |
| return { | |
| "version": self.version, | |
| "kdf": self.kdf.to_dict(), | |
| "data_key_wrapped": self.data_key_wrapped.to_dict(), | |
| "items": [it.to_dict() for it in self.items], | |
| "integrity_hmac_b64": self.integrity_hmac_b64, | |
| } | |
| # Argon2id derivation → 32B master key bytes | |
| def derive_master_key(password: str, kdf: KDFParams) -> bytes: | |
| if kdf.algo != "argon2id": | |
| raise ValueError("Unsupported KDF") | |
| salt = b64d(kdf.salt_b64) | |
| mk = hash_secret_raw( | |
| secret=password.encode("utf-8"), | |
| salt=salt, | |
| time_cost=kdf.t_cost, | |
| memory_cost=kdf.m_cost_kib, | |
| parallelism=kdf.parallelism, | |
| hash_len=kdf.hash_len, | |
| type=Type.ID, | |
| ) | |
| return mk | |
| # -------------------------- Encryption helpers -------------------------- | |
| def aesgcm_encrypt(key: bytes, plaintext: bytes, aad: Optional[bytes] = None) -> Dict[str, str]: | |
| aes = AESGCM(key) | |
| nonce = secrets.token_bytes(12) | |
| ct = aes.encrypt(nonce, plaintext, aad) | |
| return {"nonce_b64": b64e(nonce), "ct_b64": b64e(ct)} | |
| def aesgcm_decrypt(key: bytes, nonce_b64: str, ct_b64: str, aad: Optional[bytes] = None) -> bytes: | |
| aes = AESGCM(key) | |
| return aes.decrypt(b64d(nonce_b64), b64d(ct_b64), aad) | |
| # -------------------------- Integrity (HMAC) -------------------------- | |
| def canonical_vault_for_hmac(v: Dict[str, Any]) -> bytes: | |
| # Exclude integrity field itself to avoid recursion | |
| tmp = dict(v) | |
| tmp.pop("integrity_hmac_b64", None) | |
| # Stable ordering | |
| return json.dumps(tmp, sort_keys=True, separators=(",", ":")).encode("utf-8") | |
| def compute_hmac(mac_key: bytes, v_dict: Dict[str, Any]) -> str: | |
| h = HMAC(mac_key, hashes.SHA256(), backend=default_backend()) | |
| h.update(canonical_vault_for_hmac(v_dict)) | |
| return b64e(h.finalize()) | |
| def verify_hmac(mac_key: bytes, v_dict: Dict[str, Any]) -> bool: | |
| try: | |
| expected = v_dict.get("integrity_hmac_b64", "") | |
| h = HMAC(mac_key, hashes.SHA256(), backend=default_backend()) | |
| h.update(canonical_vault_for_hmac(v_dict)) | |
| h.verify(b64d(expected)) | |
| return True | |
| except Exception: | |
| return False | |
| # -------------------------- Vault Ops -------------------------- | |
| def new_vault(password: str) -> Vault: | |
| kdf = KDFParams() | |
| kdf.salt_b64 = b64e(secrets.token_bytes(16)) | |
| master_key = derive_master_key(password, kdf) | |
| # Derive subkeys | |
| wrap_key = hkdf_expand(master_key, b"wrap-key") # for wrapping data key | |
| mac_key = hkdf_expand(master_key, b"vault-mac") # for HMAC integrity | |
| data_key = secrets.token_bytes(32) | |
| wrapped = aesgcm_encrypt(wrap_key, data_key, aad=b"SmartPass:data-key") | |
| vault = Vault( | |
| version=VERSION, | |
| kdf=kdf, | |
| data_key_wrapped=WrappedDataKey(**wrapped), | |
| items=[], | |
| integrity_hmac_b64="", | |
| ) | |
| # Compute initial HMAC | |
| vdict = vault.to_dict() | |
| vdict["integrity_hmac_b64"] = "" | |
| hmac_b64 = compute_hmac(mac_key, vdict) | |
| vault.integrity_hmac_b64 = hmac_b64 | |
| # Clear secrets from locals (best-effort) | |
| return vault | |
| def unlock_vault(vault_dict: Dict[str, Any], password: str) -> Dict[str, Any]: | |
| # Returns {"data_key": bytes, "mac_key": bytes} | |
| kdf = KDFParams(**vault_dict["kdf"]) if isinstance(vault_dict["kdf"], dict) else vault_dict["kdf"] | |
| master_key = derive_master_key(password, kdf) | |
| wrap_key = hkdf_expand(master_key, b"wrap-key") | |
| mac_key = hkdf_expand(master_key, b"vault-mac") | |
| # Verify integrity first | |
| if not verify_hmac(mac_key, vault_dict): | |
| raise ValueError("Integrity check failed. The vault may be corrupted or the password is incorrect.") | |
| w = vault_dict["data_key_wrapped"] | |
| data_key = aesgcm_decrypt(wrap_key, w["nonce_b64"], w["ct_b64"], aad=b"SmartPass:data-key") | |
| return {"data_key": data_key, "mac_key": mac_key} | |
| def re_hmac(vault_dict: Dict[str, Any], mac_key: bytes) -> None: | |
| vault_dict["integrity_hmac_b64"] = "" | |
| vault_dict["integrity_hmac_b64"] = compute_hmac(mac_key, vault_dict) | |
| # -------------------------- Item Ops -------------------------- | |
| def encrypt_item(data_key: bytes, payload: Dict[str, Any]) -> VaultItem: | |
| now = time.time() | |
| enc = aesgcm_encrypt(data_key, json.dumps(payload).encode("utf-8")) | |
| return VaultItem( | |
| id=secrets.token_hex(8), | |
| type=payload.get("type", "login"), | |
| enc_blob_b64=enc["ct_b64"], | |
| nonce_b64=enc["nonce_b64"], | |
| created_at=now, | |
| updated_at=now, | |
| ) | |
| def decrypt_item(data_key: bytes, it: VaultItem) -> Dict[str, Any]: | |
| pt = aesgcm_decrypt(data_key, it.nonce_b64, it.enc_blob_b64) | |
| return json.loads(pt.decode("utf-8")) | |
| def update_item(data_key: bytes, it: VaultItem, payload: Dict[str, Any]) -> VaultItem: | |
| enc = aesgcm_encrypt(data_key, json.dumps(payload).encode("utf-8")) | |
| it.enc_blob_b64 = enc["ct_b64"] | |
| it.nonce_b64 = enc["nonce_b64"] | |
| it.updated_at = time.time() | |
| return it | |
| # -------------------------- Streamlit UI -------------------------- | |
| st.set_page_config(page_title=APP_TITLE, page_icon="🔐", layout="wide") | |
| if "vault" not in st.session_state: | |
| st.session_state.vault: Optional[Dict[str, Any]] = None | |
| if "unlocked" not in st.session_state: | |
| st.session_state.unlocked = False | |
| if "keys" not in st.session_state: | |
| st.session_state.keys = None # {data_key, mac_key} | |
| if "last_active" not in st.session_state: | |
| st.session_state.last_active = time.time() | |
| def touch(): | |
| st.session_state.last_active = time.time() | |
| def auto_lock(minutes: int): | |
| if st.session_state.unlocked: | |
| if time.time() - st.session_state.last_active > minutes * 60: | |
| st.warning("Auto-locked due to inactivity.") | |
| do_lock() | |
| def do_lock(): | |
| st.session_state.unlocked = False | |
| st.session_state.keys = None | |
| touch() | |
| # Sidebar: Create / Open / Lock / Export | |
| # -------------------------- Import helpers -------------------------- | |
| def _read_tabular(file) -> pd.DataFrame: | |
| name = (file.name or "").lower() | |
| if name.endswith(".csv"): | |
| return pd.read_csv(file) | |
| # Excel requires openpyxl (installed implicitly by pandas if not, pip install openpyxl) | |
| return pd.read_excel(file) | |
| def _standardize_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| # Map common column names to our schema | |
| aliases = { | |
| "name": ["name", "title", "site", "account"], | |
| "username": ["username", "user", "login", "email"], | |
| "url": ["url", "link", "website"], | |
| "password": ["password", "pass", "pwd"], | |
| "note": ["note", "notes", "remark", "remarks"], | |
| "tags": ["tags", "label", "labels", "category", "categories"], | |
| "type": ["type", "item_type"], | |
| } | |
| colmap: Dict[str, str] = {} | |
| lower_cols = {c.lower().strip(): c for c in df.columns} | |
| for target, alias_list in aliases.items(): | |
| for a in alias_list: | |
| if a in lower_cols: | |
| colmap[target] = lower_cols[a] | |
| break | |
| # Ensure all expected columns exist; create if missing | |
| for col in ["name", "username", "url", "password", "note", "tags", "type"]: | |
| if col not in colmap: | |
| df[col] = "" | |
| colmap[col] = col | |
| return df[[colmap[c] for c in ["name", "username", "url", "password", "note", "tags", "type"]]]\ | |
| .rename(columns={colmap[c]: c for c in colmap}) | |
| with st.sidebar: | |
| st.title("🔐 SmartPass") | |
| st.caption("Local, file-based, zero-knowledge vault. No servers.") | |
| # Auto-lock | |
| lock_minutes = st.number_input("Auto-lock (minutes)", min_value=1, max_value=120, value=5) | |
| if st.session_state.unlocked: | |
| if st.button("Lock Now", use_container_width=True): | |
| do_lock() | |
| auto_lock(lock_minutes) | |
| st.divider() | |
| st.subheader("Open Vault") | |
| up = st.file_uploader("Upload existing vault (.json)", type=["json"], accept_multiple_files=False) | |
| password_open = st.text_input("Master password", type="password") | |
| if st.button("Unlock", use_container_width=True): | |
| if up is None: | |
| st.error("Please upload a vault file.") | |
| elif not password_open: | |
| st.error("Please enter your master password.") | |
| else: | |
| try: | |
| vault_dict = json.loads(up.getvalue().decode("utf-8")) | |
| keys = unlock_vault(vault_dict, password_open) | |
| st.session_state.vault = vault_dict | |
| st.session_state.keys = keys | |
| st.session_state.unlocked = True | |
| touch() | |
| st.success("Vault unlocked.") | |
| except Exception as e: | |
| st.session_state.unlocked = False | |
| st.error(f"Failed to unlock: {e}") | |
| st.divider() | |
| st.subheader("Create New Vault") | |
| new_pw = st.text_input("New master password", type="password") | |
| new_pw2 = st.text_input("Confirm password", type="password") | |
| with st.expander("Advanced KDF (Argon2id)"): | |
| m_mib = st.slider("Memory (MiB)", 64, 512, 128, step=32) | |
| t_cost = st.slider("Iterations", 1, 5, 3) | |
| par = st.slider("Parallelism", 1, 4, 1) | |
| if st.button("Create Vault", use_container_width=True): | |
| if not new_pw: | |
| st.error("Master password required.") | |
| elif new_pw != new_pw2: | |
| st.error("Passwords do not match.") | |
| else: | |
| v = new_vault(new_pw) | |
| # override KDF knobs from UI | |
| v.kdf.m_cost_kib = m_mib * 1024 | |
| v.kdf.t_cost = t_cost | |
| v.kdf.parallelism = par | |
| # Recompute HMAC with same master pw but updated kdf params | |
| mk = derive_master_key(new_pw, v.kdf) | |
| mac_key = hkdf_expand(mk, b"vault-mac") | |
| vdict = v.to_dict() | |
| vdict["integrity_hmac_b64"] = "" | |
| vdict["integrity_hmac_b64"] = compute_hmac(mac_key, vdict) | |
| st.session_state.vault = vdict | |
| st.session_state.keys = unlock_vault(vdict, new_pw) | |
| st.session_state.unlocked = True | |
| touch() | |
| st.success("New vault created and unlocked.") | |
| st.divider() | |
| st.subheader("Export") | |
| if st.session_state.vault is not None: | |
| export_json = json.dumps(st.session_state.vault, separators=(",", ":")) | |
| st.download_button( | |
| label="Download vault JSON", | |
| data=export_json, | |
| file_name="vault.smartpass.json", | |
| mime="application/json", | |
| use_container_width=True, | |
| ) | |
| st.divider() | |
| st.subheader("Import from Excel/CSV") | |
| imp = st.file_uploader("Upload creds file (.xlsx/.xls/.csv)", type=["xlsx","xls","csv"], accept_multiple_files=False, key="importer") | |
| if imp is not None and st.session_state.unlocked: | |
| try: | |
| df = _read_tabular(imp) | |
| df = _standardize_columns(df) | |
| # Default type to 'login' if empty | |
| df['type'] = df['type'].fillna('').astype(str).str.lower().replace({'': 'login'}) | |
| added = 0 | |
| items_local: List[VaultItem] = [VaultItem(**it) for it in st.session_state.vault.get("items", [])] | |
| for _, row in df.iterrows(): | |
| payload = { | |
| "type": (row.get('type') or 'login') if (row.get('type') in ['login','note']) else 'login', | |
| "name": str(row.get('name') or '').strip(), | |
| "username": str(row.get('username') or '').strip(), | |
| "url": str(row.get('url') or '').strip(), | |
| "password": str(row.get('password') or '').strip(), | |
| "note": str(row.get('note') or '').strip(), | |
| "tags": [t.strip() for t in str(row.get('tags') or '').split(',') if t.strip()], | |
| } | |
| # Skip empty rows | |
| if not any(payload.values()): | |
| continue | |
| items_local.append(encrypt_item(st.session_state.keys["data_key"], payload)) | |
| added += 1 | |
| st.session_state.vault["items"] = [it.to_dict() for it in items_local] | |
| re_hmac(st.session_state.vault, st.session_state.keys["mac_key"]) | |
| st.success(f"Imported {added} item(s).") | |
| st.experimental_rerun() | |
| except Exception as e: | |
| st.error(f"Import failed: {e}") | |
| # -------------------------- Main Area -------------------------- | |
| st.title(APP_TITLE) | |
| if not st.session_state.unlocked: | |
| st.info("Unlock or create a vault from the left sidebar to begin.") | |
| st.stop() | |
| # Search and actions | |
| col1, col2, col3, col4 = st.columns([3, 1, 1, 1]) | |
| with col1: | |
| q = st.text_input("Search (name / username / url / tags)", value="") | |
| with col2: | |
| if st.button("Add Login"): | |
| st.session_state["add_type"] = "login" | |
| with col3: | |
| if st.button("Add Secure Note"): | |
| st.session_state["add_type"] = "note" | |
| with col4: | |
| if st.button("Lock"): | |
| do_lock() | |
| st.experimental_rerun() | |
| touch() | |
| # Items table | |
| vault_dict = st.session_state.vault | |
| keys = st.session_state.keys | |
| # Decrypt all (in-memory only) | |
| items: List[VaultItem] = [VaultItem(**it) for it in vault_dict.get("items", [])] | |
| rows = [] | |
| for it in items: | |
| try: | |
| payload = decrypt_item(keys["data_key"], it) | |
| rows.append({ | |
| "_id": it.id, | |
| "type": it.type, | |
| "name": payload.get("name", ""), | |
| "username": payload.get("username", ""), | |
| "url": payload.get("url", ""), | |
| "password": payload.get("password", "") if it.type == "login" else "", | |
| "note": payload.get("note", "") if it.type == "note" else "", | |
| "tags": ", ".join(payload.get("tags", [])), | |
| "updated": time.strftime('%Y-%m-%d %H:%M', time.localtime(it.updated_at)), | |
| }) | |
| except Exception: | |
| rows.append({"_id": it.id, "type": it.type, "name": "<decrypt error>", "username": "", "url": "", "password": "", "note": "", "tags": "", "updated": ""}) | |
| # Filter | |
| if q.strip(): | |
| Q = q.lower() | |
| rows = [r for r in rows if any(Q in (str(r[k]) or "").lower() for k in ["name", "username", "url", "tags", "note"]) ] | |
| st.caption(f"Items: {len(rows)}") | |
| # Render items | |
| for r in rows: | |
| with st.expander(f"{r['name'] or '(unnamed)'} — {r['username']} [{r['type']}] • updated {r['updated']}"): | |
| c1, c2 = st.columns([2, 1]) | |
| with c1: | |
| st.write(f"**URL:** {r['url']}") | |
| if r['type'] == 'login': | |
| st.text_input("Password", value=r['password'], type="password", key=f"pw_{r['_id']}") | |
| if r['type'] == 'note': | |
| st.text_area("Note", value=r['note'], height=100, key=f"note_{r['_id']}") | |
| st.write(f"**Tags:** {r['tags']}") | |
| with c2: | |
| if st.button("Edit", key=f"edit_{r['_id']}"): | |
| st.session_state["edit_id"] = r['_id'] | |
| st.session_state["edit_payload"] = r | |
| if st.button("Delete", key=f"del_{r['_id']}"): | |
| items = [it for it in items if it.id != r['_id']] | |
| vault_dict["items"] = [it.to_dict() for it in items] | |
| re_hmac(vault_dict, keys["mac_key"]) | |
| st.session_state.vault = vault_dict | |
| st.success("Deleted.") | |
| st.experimental_rerun() | |
| # Add / Edit forms | |
| if st.session_state.get("add_type"): | |
| with st.modal("Add Item"): | |
| add_type = st.session_state.get("add_type") | |
| name = st.text_input("Name") | |
| username = st.text_input("Username") if add_type == "login" else "" | |
| url = st.text_input("URL") if add_type == "login" else "" | |
| password = st.text_input("Password", type="password") if add_type == "login" else "" | |
| note = st.text_area("Secure Note", height=120) if add_type == "note" else "" | |
| tags = st.text_input("Tags (comma-separated)") | |
| if st.button("Save"): | |
| payload = { | |
| "type": add_type, | |
| "name": name, | |
| "username": username, | |
| "url": url, | |
| "password": password, | |
| "note": note, | |
| "tags": [t.strip() for t in tags.split(",") if t.strip()], | |
| } | |
| new_item = encrypt_item(keys["data_key"], payload) | |
| items.append(new_item) | |
| vault_dict["items"] = [it.to_dict() for it in items] | |
| re_hmac(vault_dict, keys["mac_key"]) | |
| st.session_state.vault = vault_dict | |
| st.session_state["add_type"] = None | |
| st.success("Item added.") | |
| st.experimental_rerun() | |
| if st.button("Cancel"): | |
| st.session_state["add_type"] = None | |
| if st.session_state.get("edit_id"): | |
| with st.modal("Edit Item"): | |
| eid = st.session_state.get("edit_id") | |
| original = next((it for it in items if it.id == eid), None) | |
| payload = st.session_state.get("edit_payload", {}) | |
| etype = payload.get("type", "login") | |
| name = st.text_input("Name", value=payload.get("name", "")) | |
| username = st.text_input("Username", value=payload.get("username", "")) if etype == "login" else "" | |
| url = st.text_input("URL", value=payload.get("url", "")) if etype == "login" else "" | |
| password = st.text_input("Password", value=payload.get("password", ""), type="password") if etype == "login" else "" | |
| note = st.text_area("Secure Note", value=payload.get("note", ""), height=120) if etype == "note" else "" | |
| tags = st.text_input("Tags (comma-separated)", value=payload.get("tags", "")) | |
| if st.button("Save Changes"): | |
| new_payload = { | |
| "type": etype, | |
| "name": name, | |
| "username": username, | |
| "url": url, | |
| "password": password, | |
| "note": note, | |
| "tags": [t.strip() for t in tags.split(",") if t.strip()], | |
| } | |
| updated = update_item(keys["data_key"], original, new_payload) | |
| # Replace in list | |
| items = [updated if it.id == eid else it for it in items] | |
| vault_dict["items"] = [it.to_dict() for it in items] | |
| re_hmac(vault_dict, keys["mac_key"]) | |
| st.session_state.vault = vault_dict | |
| st.session_state["edit_id"] = None | |
| st.success("Updated.") | |
| st.experimental_rerun() | |
| if st.button("Cancel"): | |
| st.session_state["edit_id"] = None | |
| # Security footnotes | |
| with st.expander("Security Notes & Tips"): | |
| st.markdown( | |
| """ | |
| - **All encryption/decryption happens locally.** Your vault is a JSON file that only contains ciphertext. Keep backups. | |
| - **Zero-knowledge:** The app never stores your master password. Only a key derived via **Argon2id** is used transiently in memory. | |
| - **Integrity:** The vault includes an HMAC to detect tampering/corruption. Wrong password also fails integrity. | |
| - **Clipboard caution:** This demo does not auto-copy passwords to avoid OS clipboard risks. | |
| - **KDF tuning:** If your device is slow, reduce memory/iterations in the sidebar's Advanced KDF. | |
| """ | |
| ) | |