Spaces:
Runtime error
Runtime error
| import os, xml.etree.ElementTree as ET, torch, torch.nn as nn, numpy as np, logging, requests | |
| from collections import defaultdict | |
| from torch.utils.data import DataLoader, Dataset | |
| from transformers import AutoTokenizer, AutoModel | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from accelerate import Accelerator | |
| from tqdm import tqdm | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Set the cache directory path | |
| cache_dir = '/app/cache' | |
| # Create the directory if it doesn't exist | |
| if not os.path.exists(cache_dir): | |
| os.makedirs(cache_dir) | |
| # Set the environment variable | |
| os.environ['HF_HOME'] = cache_dir | |
| # Verify the environment variable is set | |
| print(f"HF_HOME is set to: {os.environ['HF_HOME']}") | |
| class Config: E, H, N, C, B, M, S, V, W, L, D = 512, 32, 1024, 256, 128, 20000, 2048, 1e5, 4000, 2e-4, .15 | |
| class MyDataset(Dataset): | |
| def __init__(self, data, labels): self.data, self.labels = data, labels | |
| def __len__(self): return len(self.data) | |
| def __getitem__(self, index): return self.data[index], self.labels[index] | |
| class MyModel(nn.Module): | |
| def __init__(self, input_size, hidden_size, output_size): | |
| super(MyModel, self).__init__() | |
| self.hidden, self.output = nn.Linear(input_size, hidden_size), nn.Linear(hidden_size, output_size) | |
| self.lstm, self.fc = nn.LSTM(input_size, hidden_size, batch_first=True), nn.Linear(hidden_size, output_size) | |
| def forward(self, x): | |
| x = torch.relu(self.hidden(x)) | |
| h0, c0 = torch.zeros(1, x.size(0), hidden_size), torch.zeros(1, x.size(0), hidden_size) | |
| out, _ = self.lstm(x, (h0, c0)) | |
| return self.fc(out[:, -1, :]) | |
| class MemoryNetwork: | |
| def __init__(self, memory_size, embedding_size): | |
| self.memory, self.usage = np.zeros((memory_size, embedding_size)), np.zeros(memory_size) | |
| def store(self, data): | |
| index = np.argmin(self.usage) | |
| self.memory[index], self.usage[index] = data, 1.0 | |
| def retrieve(self, query): | |
| index = np.argmax(np.dot(self.memory, query)) | |
| self.usage[index] += 1.0 | |
| return self.memory[index] | |
| def update_usage(self): self.usage *= 0.99 | |
| class DM(nn.Module): | |
| def __init__(self, s): | |
| super(DM, self).__init__() | |
| self.s = nn.ModuleDict({sn: nn.ModuleList([self.cl(lp) for lp in l]) for sn, l in s.items()}) | |
| def cl(self, lp): | |
| l = [nn.Linear(lp['input_size'], lp['output_size'])] | |
| if lp.get('batch_norm', True): l.append(nn.BatchNorm1d(lp['output_size'])) | |
| a = lp.get('activation', 'relu') | |
| if a == 'relu': l.append(nn.ReLU(inplace=True)) | |
| elif a == 'tanh': l.append(nn.Tanh()) | |
| elif a == 'sigmoid': l.append(nn.Sigmoid()) | |
| elif a == 'leaky_relu': l.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) | |
| elif a == 'elu': l.append(nn.ELU(alpha=1.0, inplace=True)) | |
| if dr := lp.get('dropout', 0.0): l.append(nn.Dropout(p=dr)) | |
| return nn.Sequential(*l) | |
| def forward(self, x, sn=None): | |
| if sn: | |
| for l in self.s[sn]: x = l(x) | |
| else: | |
| for sn, l in self.s.items(): | |
| for l in l: x = l(x) | |
| return x | |
| def parse_xml(file_path): | |
| t, r, l = ET.parse(file_path), ET.parse(file_path).getroot(), [] | |
| for ly in r.findall('.//layer'): | |
| lp = {'input_size': int(ly.get('input_size', 128)), 'output_size': int(ly.get('output_size', 256)), 'activation': ly.get('activation', 'relu').lower()} | |
| l.append(lp) | |
| return l | |
| def create_model_from_folder(folder_path): | |
| s = defaultdict(list) | |
| for r, d, f in os.walk(folder_path): | |
| for file in f: | |
| if file.endswith('.xml'): | |
| s[os.path.basename(r).replace('.', '_')].extend(parse_xml(os.path.join(r, file))) | |
| return DM(dict(s)) | |
| def create_embeddings_and_sentences(folder_path, model_name="pile-of-law/legalbert-large-1.7M-1"): | |
| t, m, embeddings, ds = AutoTokenizer.from_pretrained(model_name), AutoModel.from_pretrained(model_name), [], [] | |
| for r, d, f in os.walk(folder_path): | |
| for file in f: | |
| if file.endswith('.xml'): | |
| tree, root = ET.parse(os.path.join(r, file)), ET.parse(os.path.join(r, file)).getroot() | |
| for e in root.iter(): | |
| if e.text: | |
| text = e.text.strip() | |
| i = t(text, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| embeddings.append(m(**i).last_hidden_state.mean(dim=1).numpy()) | |
| ds.append(text) | |
| return np.vstack(embeddings), ds | |
| def query_vector_similarity(query, embeddings, ds, model_name="pile-of-law/legalbert-large-1.7M-2"): | |
| t, m = AutoTokenizer.from_pretrained(model_name), AutoModel.from_pretrained(model_name) | |
| i = t(query, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| qe = m(**i).last_hidden_state.mean(dim=1).numpy() | |
| return [ds[i] for i in cosine_similarity(qe, embeddings)[0].argsort()[-5:][::-1]] | |
| def fetch_courtlistener_data(query): | |
| try: | |
| response = requests.get("https://nzlii.org/cgi-bin/sinosrch.cgi", params={"method": "auto", "query": query, "meta": "/nz", "results": "50", "format": "json"}, headers={"Accept": "application/json"}, timeout=10) | |
| response.raise_for_status() | |
| return [{"title": r.get("title", ""), "citation": r.get("citation", ""), "date": r.get("date", ""), "court": r.get("court", ""), "summary": r.get("summary", ""), "url": r.get("url", "")} for r in response.json().get("results", [])] | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Failed to fetch data from NZLII API: {str(e)}") | |
| return [] | |
| def main(): | |
| folder_path, model = 'data', create_model_from_folder('data') | |
| logging.info(f"Created dynamic PyTorch model with sections: {list(model.s.keys())}") | |
| embeddings, ds = create_embeddings_and_sentences(folder_path) | |
| accelerator, optimizer, criterion, num_epochs = Accelerator(), torch.optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss(), 10 | |
| dataset, dataloader = MyDataset(torch.randn(1000, 10), torch.randint(0, 5, (1000,))), DataLoader(MyDataset(torch.randn(1000, 10), torch.randint(0, 5, (1000,))), batch_size=32, shuffle=True) | |
| model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader) | |
| for epoch in range(num_epochs): | |
| model.train() | |
| for batch_data, batch_labels in dataloader: | |
| optimizer.zero_grad() | |
| outputs = model(batch_data) | |
| loss = criterion(outputs, batch_labels) | |
| accelerator.backward(loss) | |
| optimizer.step() | |
| logging.info(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}") | |
| query = "example query text" | |
| logging.info(f"Query results: {query_vector_similarity(query, embeddings, ds)}") | |
| logging.info(f"CourtListener API results: {fetch_courtlistener_data(query)}") | |
| if __name__ == "__main__": | |
| main() |