Spaces:
Runtime error
Runtime error
| import os | |
| import xml.etree.ElementTree as ET | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from typing import List, Dict, Any, Optional | |
| from collections import defaultdict | |
| from accelerate import Accelerator | |
| class DynamicModel(nn.Module): | |
| def __init__(self, sections: Dict[str, List[Dict[str, Any]]]): | |
| """ | |
| Initialize the DynamicModel with configurable neural network sections. | |
| Args: | |
| sections (Dict[str, List[Dict[str, Any]]]): Dictionary mapping section names to lists of layer configurations. | |
| Each layer configuration is a dictionary containing: | |
| - input_size (int): Size of input features | |
| - output_size (int): Size of output features | |
| - activation (str, optional): Activation function name ('relu', 'tanh', 'sigmoid', etc.) | |
| - dropout (float, optional): Dropout rate | |
| - batch_norm (bool, optional): Whether to use batch normalization | |
| - hidden_layers (List[Dict[str, Any]], optional): List of hidden layer configurations | |
| - memory_augmentation (bool, optional): Whether to add a memory augmentation layer | |
| - hybrid_attention (bool, optional): Whether to add a hybrid attention layer | |
| - dynamic_flash_attention (bool, optional): Whether to add a dynamic flash attention layer | |
| Example: | |
| sections = { | |
| 'encoder': [ | |
| {'input_size': 128, 'output_size': 256, 'activation': 'relu', 'batch_norm': True}, | |
| {'input_size': 256, 'output_size': 512, 'activation': 'leaky_relu', 'dropout': 0.1} | |
| ], | |
| 'decoder': [ | |
| {'input_size': 512, 'output_size': 256, 'activation': 'elu'}, | |
| {'input_size': 256, 'output_size': 128, 'activation': 'tanh'} | |
| ] | |
| } | |
| """ | |
| super(DynamicModel, self).__init__() | |
| self.sections = nn.ModuleDict() | |
| # Default section configuration if none provided | |
| if not sections: | |
| sections = { | |
| 'default': [{ | |
| 'input_size': 128, | |
| 'output_size': 256, | |
| 'activation': 'relu', | |
| 'batch_norm': True, | |
| 'dropout': 0.1 | |
| }] | |
| } | |
| # Initialize each section with its layer configurations | |
| for section_name, layers in sections.items(): | |
| self.sections[section_name] = nn.ModuleList() | |
| for layer_params in layers: | |
| self.sections[section_name].append(self.create_layer(layer_params)) | |
| def create_layer(self, layer_params: Dict[str, Any]) -> nn.Module: | |
| """ | |
| Creates a neural network layer based on provided parameters. | |
| Args: | |
| layer_params (Dict[str, Any]): Dictionary containing layer configuration | |
| Required keys: | |
| - input_size (int): Size of input features | |
| - output_size (int): Size of output features | |
| Optional keys: | |
| - activation (str): Activation function name ('relu', 'tanh', 'sigmoid', None) | |
| - dropout (float): Dropout rate if needed | |
| - batch_norm (bool): Whether to use batch normalization | |
| - hidden_layers (List[Dict[str, Any]]): List of hidden layer configurations | |
| - memory_augmentation (bool): Whether to add a memory augmentation layer | |
| - hybrid_attention (bool): Whether to add a hybrid attention layer | |
| - dynamic_flash_attention (bool): Whether to add a dynamic flash attention layer | |
| Returns: | |
| nn.Module: Configured neural network layer with activation | |
| Raises: | |
| KeyError: If required parameters are missing | |
| ValueError: If activation function is not supported | |
| """ | |
| layers = [] | |
| # Add linear layer | |
| layers.append(nn.Linear(layer_params['input_size'], layer_params['output_size'])) | |
| # Add batch normalization if specified | |
| if layer_params.get('batch_norm', False): | |
| layers.append(nn.BatchNorm1d(layer_params['output_size'])) | |
| # Add activation function | |
| activation = layer_params.get('activation', 'relu') | |
| if activation == 'relu': | |
| layers.append(nn.ReLU(inplace=True)) | |
| elif activation == 'tanh': | |
| layers.append(nn.Tanh()) | |
| elif activation == 'sigmoid': | |
| layers.append(nn.Sigmoid()) | |
| elif activation == 'leaky_relu': | |
| layers.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) | |
| elif activation == 'elu': | |
| layers.append(nn.ELU(alpha=1.0, inplace=True)) | |
| elif activation is not None: | |
| raise ValueError(f"Unsupported activation function: {activation}") | |
| # Add dropout if specified | |
| if dropout_rate := layer_params.get('dropout', 0.0): | |
| layers.append(nn.Dropout(p=dropout_rate)) | |
| # Add hidden layers if specified | |
| if hidden_layers := layer_params.get('hidden_layers', []): | |
| for hidden_layer_params in hidden_layers: | |
| layers.append(self.create_layer(hidden_layer_params)) | |
| # Add memory augmentation layer if specified | |
| if layer_params.get('memory_augmentation', False): | |
| layers.append(MemoryAugmentationLayer(layer_params['output_size'])) | |
| # Add hybrid attention layer if specified | |
| if layer_params.get('hybrid_attention', False): | |
| layers.append(HybridAttentionLayer(layer_params['output_size'])) | |
| # Add dynamic flash attention layer if specified | |
| if layer_params.get('dynamic_flash_attention', False): | |
| layers.append(DynamicFlashAttentionLayer(layer_params['output_size'])) | |
| return nn.Sequential(*layers) | |
| def forward(self, x: torch.Tensor, section_name: Optional[str] = None) -> torch.Tensor: | |
| """ | |
| Forward pass through the dynamic model architecture. | |
| Args: | |
| x (torch.Tensor): Input tensor to process | |
| section_name (Optional[str]): Specific section to process. If None, processes all sections | |
| Returns: | |
| torch.Tensor: Processed output tensor | |
| Raises: | |
| KeyError: If specified section_name doesn't exist | |
| """ | |
| if section_name is not None: | |
| if section_name not in self.sections: | |
| raise KeyError(f"Section '{section_name}' not found in model") | |
| for layer in self.sections[section_name]: | |
| x = layer(x) | |
| else: | |
| for section_name, layers in self.sections.items(): | |
| for layer in layers: | |
| x = layer(x) | |
| return x | |
| class MemoryAugmentationLayer(nn.Module): | |
| def __init__(self, size: int): | |
| super(MemoryAugmentationLayer, self).__init__() | |
| self.memory = nn.Parameter(torch.randn(size)) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return x + self.memory | |
| class HybridAttentionLayer(nn.Module): | |
| def __init__(self, size: int): | |
| super(HybridAttentionLayer, self).__init__() | |
| self.attention = nn.MultiheadAttention(size, num_heads=8) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = x.unsqueeze(1) # Add sequence dimension | |
| attn_output, _ = self.attention(x, x, x) | |
| return attn_output.squeeze(1) | |
| class DynamicFlashAttentionLayer(nn.Module): | |
| def __init__(self, size: int): | |
| super(DynamicFlashAttentionLayer, self).__init__() | |
| self.attention = nn.MultiheadAttention(size, num_heads=8) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = x.unsqueeze(1) # Add sequence dimension | |
| attn_output, _ = self.attention(x, x, x) | |
| return attn_output.squeeze(1) | |
| def parse_xml_file(file_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parses an XML configuration file to extract layer parameters for neural network construction. | |
| Args: | |
| file_path (str): Path to the XML configuration file | |
| Returns: | |
| List[Dict[str, Any]]: List of dictionaries containing layer configurations | |
| Raises: | |
| ET.ParseError: If XML file is malformed | |
| KeyError: If required attributes are missing in XML | |
| """ | |
| tree = ET.parse(file_path) | |
| root = tree.getroot() | |
| layers = [] | |
| for layer in root.findall('.//layer'): | |
| layer_params = {} | |
| layer_params['input_size'] = int(layer.get('input_size', 128)) | |
| layer_params['output_size'] = int(layer.get('output_size', 256)) | |
| layer_params['activation'] = layer.get('activation', 'relu').lower() | |
| # Validate activation function | |
| if layer_params['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']: | |
| raise ValueError(f"Unsupported activation function: {layer_params['activation']}") | |
| # Validate dimensions | |
| if layer_params['input_size'] <= 0 or layer_params['output_size'] <= 0: | |
| raise ValueError("Layer dimensions must be positive integers") | |
| layers.append(layer_params) | |
| if not layers: | |
| # Fallback to default configuration if no layers found | |
| layers.append({ | |
| 'input_size': 128, | |
| 'output_size': 256, | |
| 'activation': 'relu' | |
| }) | |
| return layers | |
| def create_model_from_folder(folder_path: str) -> DynamicModel: | |
| """ | |
| Creates a DynamicModel instance by parsing XML files in the specified folder structure. | |
| Each subfolder represents a model section, and XML files within contain layer configurations. | |
| The function recursively walks through the folder structure, processing all XML files to build | |
| the model architecture. | |
| Args: | |
| folder_path (str): Path to the root folder containing XML configuration files | |
| Returns: | |
| DynamicModel: A configured neural network model based on the XML specifications | |
| Raises: | |
| FileNotFoundError: If the specified folder path doesn't exist | |
| ET.ParseError: If XML parsing fails for any configuration file | |
| """ | |
| sections = defaultdict(list) | |
| if not os.path.exists(folder_path): | |
| print(f"Warning: Folder {folder_path} does not exist. Creating model with default configuration.") | |
| return DynamicModel({}) | |
| xml_files_found = False | |
| for root, dirs, files in os.walk(folder_path): | |
| for file in files: | |
| if file.endswith('.xml'): | |
| xml_files_found = True | |
| file_path = os.path.join(root, file) | |
| try: | |
| layers = parse_xml_file(file_path) | |
| section_name = os.path.basename(root) | |
| sections[section_name].extend(layers) | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {str(e)}") | |
| if not xml_files_found: | |
| print("Warning: No XML files found. Creating model with default configuration.") | |
| return DynamicModel({}) | |
| return DynamicModel(dict(sections)) | |
| def main(): | |
| """ | |
| Main function that demonstrates the creation and training of a dynamic PyTorch model. | |
| This function: | |
| 1. Creates a dynamic model from XML configurations | |
| 2. Sets up distributed training environment using Accelerator | |
| 3. Configures optimization components (optimizer, loss function) | |
| 4. Creates synthetic dataset for demonstration | |
| 5. Implements distributed training loop with loss tracking | |
| The model architecture is determined by XML files in the 'Xml_Data' folder, | |
| where each subfolder represents a model section containing layer configurations. | |
| """ | |
| folder_path = 'data' | |
| model = create_model_from_folder(folder_path) | |
| print(f"Created dynamic PyTorch model with sections: {list(model.sections.keys())}") | |
| # Dynamically determine input size from first layer configuration | |
| first_section = next(iter(model.sections.keys())) | |
| first_layer = model.sections[first_section][0] | |
| input_features = first_layer[0].in_features | |
| # Validate model with sample input | |
| sample_input = torch.randn(1, input_features) | |
| output = model(sample_input) | |
| print(f"Sample output shape: {output.shape}") | |
| # Initialize distributed training components | |
| accelerator = Accelerator() | |
| # Configure training parameters and optimization components | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
| criterion = nn.CrossEntropyLoss() | |
| num_epochs = 10 | |
| # Generate synthetic dataset for demonstration purposes | |
| dataset = torch.utils.data.TensorDataset( | |
| torch.randn(100, input_features), | |
| torch.randint(0, 2, (100,)) | |
| ) | |
| train_dataloader = torch.utils.data.DataLoader( | |
| dataset, | |
| batch_size=16, | |
| shuffle=True | |
| ) | |
| # Prepare model, optimizer, and dataloader for distributed training | |
| model, optimizer, train_dataloader = accelerator.prepare( | |
| model, | |
| optimizer, | |
| train_dataloader | |
| ) | |
| # Execute training loop with distributed processing | |
| for epoch in range(num_epochs): | |
| model.train() | |
| total_loss = 0 | |
| for batch_idx, (inputs, labels) in enumerate(train_dataloader): | |
| optimizer.zero_grad() | |
| outputs = model(inputs) | |
| loss = criterion(outputs, labels) | |
| accelerator.backward(loss) | |
| optimizer.step() | |
| total_loss += loss.item() | |
| avg_loss = total_loss / len(train_dataloader) | |
| print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}") | |
| if __name__ == "__main__": | |
| main() |