Spaces:
Runtime error
Runtime error
| import os, xml.etree.ElementTree as ET, torch, torch.nn as nn, torch.nn.functional as F, numpy as np, logging, requests | |
| from typing import List, Dict, Any, Optional | |
| from collections import defaultdict | |
| from accelerate import Accelerator | |
| from transformers import AutoTokenizer, AutoModel | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import termcolor | |
| # Set the cache directory path | |
| cache_dir = '/app/cache' | |
| # Create the directory if it doesn't exist | |
| if not os.path.exists(cache_dir): | |
| os.makedirs(cache_dir) | |
| # Set the environment variable | |
| os.environ['TRANSFORMERS_CACHE'] = cache_dir | |
| # Verify the environment variable is set | |
| print(f"TRANSFORMERS_CACHE is set to: {os.environ['TRANSFORMERS_CACHE']}") | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| class DM(nn.Module): | |
| def __init__(self, s: Dict[str, List[Dict[str, Any]]]): | |
| super(DM, self).__init__() | |
| self.s = nn.ModuleDict() | |
| if not s: s = {'default': [{'input_size': 128, 'output_size': 256, 'activation': 'relu', 'batch_norm': True, 'dropout': 0.1}]} | |
| for sn, l in s.items(): | |
| self.s[sn] = nn.ModuleList() | |
| for lp in l: | |
| logging.info(f"Creating layer in section '{sn}' with params: {lp}") | |
| self.s[sn].append(self.cl(lp)) | |
| def cl(self, lp: Dict[str, Any]) -> nn.Module: | |
| l = [nn.Linear(lp['input_size'], lp['output_size'])] | |
| if lp.get('batch_norm', True): l.append(nn.BatchNorm1d(lp['output_size'])) | |
| a = lp.get('activation', 'relu') | |
| if a == 'relu': l.append(nn.ReLU(inplace=True)) | |
| elif a == 'tanh': l.append(nn.Tanh()) | |
| elif a == 'sigmoid': l.append(nn.Sigmoid()) | |
| elif a == 'leaky_relu': l.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) | |
| elif a == 'elu': l.append(nn.ELU(alpha=1.0, inplace=True)) | |
| elif a is not None: raise ValueError(f"Unsupported activation function: {a}") | |
| if dr := lp.get('dropout', 0.0): l.append(nn.Dropout(p=dr)) | |
| if hl := lp.get('hidden_layers', []): | |
| for hlp in hl: l.append(self.cl(hlp)) | |
| if lp.get('memory_augmentation', True): l.append(MAL(lp['output_size'])) | |
| if lp.get('hybrid_attention', True): l.append(HAL(lp['output_size'])) | |
| if lp.get('dynamic_flash_attention', True): l.append(DFAL(lp['output_size'])) | |
| return nn.Sequential(*l) | |
| def forward(self, x: torch.Tensor, sn: Optional[str] = None) -> torch.Tensor: | |
| if sn is not None: | |
| if sn not in self.s: raise KeyError(f"Section '{sn}' not found in model") | |
| for l in self.s[sn]: x = l(x) | |
| else: | |
| for sn, l in self.s.items(): | |
| for l in l: x = l(x) | |
| return x | |
| class MAL(nn.Module): | |
| def __init__(self, s: int): | |
| super(MAL, self).__init__() | |
| self.m = nn.Parameter(torch.randn(s)) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return x + self.m | |
| class HAL(nn.Module): | |
| def __init__(self, s: int): | |
| super(HAL, self).__init__() | |
| self.a = nn.MultiheadAttention(s, num_heads=8) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = x.unsqueeze(1) | |
| ao, _ = self.a(x, x, x) | |
| return ao.squeeze(1) | |
| class DFAL(nn.Module): | |
| def __init__(self, s: int): | |
| super(DFAL, self).__init__() | |
| self.a = nn.MultiheadAttention(s, num_heads=8) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = x.unsqueeze(1) | |
| ao, _ = self.a(x, x, x) | |
| return ao.squeeze(1) | |
| def px(file_path: str) -> List[Dict[str, Any]]: | |
| t = ET.parse(file_path) | |
| r = t.getroot() | |
| l = [] | |
| for ly in r.findall('.//layer'): | |
| lp = {'input_size': int(ly.get('input_size', 128)), 'output_size': int(ly.get('output_size', 256)), 'activation': ly.get('activation', 'relu').lower()} | |
| if lp['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']: raise ValueError(f"Unsupported activation function: {lp['activation']}") | |
| if lp['input_size'] <= 0 or lp['output_size'] <= 0: raise ValueError("Layer dimensions must be positive integers") | |
| l.append(lp) | |
| if not l: l.append({'input_size': 128, 'output_size': 256, 'activation': 'relu'}) | |
| return l | |
| def cmf(folder_path: str) -> DM: | |
| s = defaultdict(list) | |
| if not os.path.exists(folder_path): | |
| logging.warning(f"Folder {folder_path} does not exist. Creating model with default configuration.") | |
| return DM({}) | |
| xf = True | |
| for r, d, f in os.walk(folder_path): | |
| for file in f: | |
| if file.endswith('.xml'): | |
| xf = True | |
| fp = os.path.join(r, file) | |
| try: | |
| l = px(fp) | |
| sn = os.path.basename(r).replace('.', '_') | |
| s[sn].extend(l) | |
| except Exception as e: | |
| logging.error(f"Error processing {fp}: {str(e)}") | |
| if not xf: | |
| logging.warning("No XML files found. Creating model with default configuration.") | |
| return DM({}) | |
| return DM(dict(s)) | |
| def ceas(folder_path: str, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| t = AutoTokenizer.from_pretrained(model_name) | |
| m = AutoModel.from_pretrained(model_name) | |
| embeddings = [] | |
| ds = [] | |
| for r, d, f in os.walk(folder_path): | |
| for file in f: | |
| if file.endswith('.xml'): | |
| fp = os.path.join(r, file) | |
| try: | |
| tree = ET.parse(fp) | |
| root = tree.getroot() | |
| for e in root.iter(): | |
| if e.text: | |
| text = e.text.strip() | |
| i = t(text, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| emb = m(**i).last_hidden_state.mean(dim=1).numpy() | |
| embeddings.append(emb) | |
| ds.append(text) | |
| except Exception as e: | |
| logging.error(f"Error processing {fp}: {str(e)}") | |
| embeddings = np.vstack(embeddings) | |
| return embeddings, ds | |
| def qvs(query: str, embeddings, ds, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| t = AutoTokenizer.from_pretrained(model_name) | |
| m = AutoModel.from_pretrained(model_name) | |
| i = t(query, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| qe = m(**i).last_hidden_state.mean(dim=1).numpy() | |
| similarities = cosine_similarity(qe, embeddings) | |
| top_k_indices = similarities[0].argsort()[-5:][::-1] | |
| return [ds[i] for i in top_k_indices] | |
| def fetch_courtlistener_data(query: str) -> List[Dict[str, Any]]: | |
| base_url = "https://nzlii.org/cgi-bin/sinosrch.cgi" | |
| params = { | |
| "method": "auto", | |
| "query": query, | |
| "meta": "/nz", | |
| "mask_path": "", | |
| "results": "50", | |
| "format": "json" | |
| } | |
| try: | |
| response = requests.get(base_url, params=params, headers={"Accept": "application/json"}, timeout=10) | |
| response.raise_for_status() | |
| results = response.json().get("results", []) | |
| processed_results = [] | |
| for result in results: | |
| processed_results.append({ | |
| "title": result.get("title", ""), | |
| "citation": result.get("citation", ""), | |
| "date": result.get("date", ""), | |
| "court": result.get("court", ""), | |
| "summary": result.get("summary", ""), | |
| "url": result.get("url", "") | |
| }) | |
| return processed_results | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Failed to fetch data from NZLII API: {str(e)}") | |
| return [] | |
| except ValueError as e: | |
| logging.error(f"Failed to parse NZLII API response: {str(e)}") | |
| return [] | |
| def main(): | |
| fp = 'data' | |
| m = cmf(fp) | |
| logging.info(f"Created dynamic PyTorch model with sections: {list(m.s.keys())}") | |
| fs = next(iter(m.s.keys())) | |
| fl = m.s[fs][0] | |
| ife = fl[0].in_features | |
| si = torch.randn(1, ife) | |
| o = m(si) | |
| logging.info(f"Sample output shape: {o.shape}") | |
| embeddings, ds = ceas(fp) | |
| a = Accelerator() | |
| o = torch.optim.Adam(m.parameters(), lr=0.001) | |
| c = nn.CrossEntropyLoss() | |
| ne = 10 | |
| d = torch.utils.data.TensorDataset(torch.randn(100, ife), torch.randint(0, 2, (100,))) | |
| td = torch.utils.data.DataLoader(d, batch_size=16, shuffle=True) | |
| m, o, td = a.prepare(m, o, td) | |
| for e in range(ne): | |
| m.train() | |
| tl = 0 | |
| for bi, (i, l) in enumerate(td): | |
| o.zero_grad() | |
| o = m(i) | |
| l = c(o, l) | |
| a.backward(l) | |
| o.step() | |
| tl += l.item() | |
| al = tl / len(td) | |
| logging.info(f"Epoch {e+1}/{ne}, Average Loss: {al:.4f}") | |
| uq = "example query text" | |
| r = qvs(uq, embeddings, ds) | |
| logging.info(f"Query results: {r}") | |
| cl_data = fetch_courtlistener_data(uq) | |
| logging.info(f"CourtListener API results: {cl_data}") | |
| if __name__ == "__main__": | |
| main() |