Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Utility functions for ChipVerifyAI | |
| RTL parsing, metrics calculation, and visualization helpers | |
| """ | |
| import re | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| class RTLParser: | |
| """Parse RTL files to extract design features""" | |
| def __init__(self): | |
| # Regex patterns for RTL parsing | |
| self.patterns = { | |
| 'module': r'\bmodule\s+(\w+)', | |
| 'always_block': r'\balways\s*[@\(\)]', | |
| 'assign': r'\bassign\s+', | |
| 'if_statement': r'\bif\s*\(', | |
| 'case_statement': r'\bcase\s*\(', | |
| 'for_loop': r'\bfor\s*\(', | |
| 'function': r'\bfunction\s+', | |
| 'task': r'\btask\s+', | |
| 'signal': r'(?:wire|reg|logic)\s+(?:\[[^\]]+\])?\s*(\w+)', | |
| 'clock': r'\b(?:clk|clock)\b', | |
| 'reset': r'\b(?:rst|reset)\b', | |
| 'memory': r'\b(?:ram|rom|memory|mem)\b', | |
| 'fsm': r'\b(?:state|fsm|STATE|FSM)\b' | |
| } | |
| self.compiled_patterns = {k: re.compile(v, re.IGNORECASE) | |
| for k, v in self.patterns.items()} | |
| def parse_rtl_content(self, content: str) -> Dict[str, Any]: | |
| """Parse RTL content and extract features""" | |
| features = { | |
| 'lines_of_code': len(content.splitlines()), | |
| 'module_count': 0, | |
| 'signal_count': 0, | |
| 'always_blocks': 0, | |
| 'assign_statements': 0, | |
| 'if_statements': 0, | |
| 'case_statements': 0, | |
| 'for_loops': 0, | |
| 'function_count': 0, | |
| 'task_count': 0, | |
| 'clock_signals': 0, | |
| 'reset_signals': 0, | |
| 'has_memory': False, | |
| 'has_fsm': False, | |
| 'complexity_score': 0.0 | |
| } | |
| try: | |
| # Count occurrences | |
| features['module_count'] = len(self.compiled_patterns['module'].findall(content)) | |
| features['always_blocks'] = len(self.compiled_patterns['always_block'].findall(content)) | |
| features['assign_statements'] = len(self.compiled_patterns['assign'].findall(content)) | |
| features['if_statements'] = len(self.compiled_patterns['if_statement'].findall(content)) | |
| features['case_statements'] = len(self.compiled_patterns['case_statement'].findall(content)) | |
| features['for_loops'] = len(self.compiled_patterns['for_loop'].findall(content)) | |
| features['function_count'] = len(self.compiled_patterns['function'].findall(content)) | |
| features['task_count'] = len(self.compiled_patterns['task'].findall(content)) | |
| # Extract signal names | |
| signals = self.compiled_patterns['signal'].findall(content) | |
| features['signal_count'] = len(set(signals)) # Unique signals | |
| # Check for specific features | |
| features['clock_signals'] = len(self.compiled_patterns['clock'].findall(content)) | |
| features['reset_signals'] = len(self.compiled_patterns['reset'].findall(content)) | |
| features['has_memory'] = bool(self.compiled_patterns['memory'].search(content)) | |
| features['has_fsm'] = bool(self.compiled_patterns['fsm'].search(content)) | |
| # Calculate complexity score | |
| features['complexity_score'] = self._calculate_complexity(features) | |
| except Exception as e: | |
| print(f"Warning: RTL parsing error: {e}") | |
| return features | |
| def _calculate_complexity(self, features: Dict[str, Any]) -> float: | |
| """Calculate design complexity score""" | |
| # Weighted complexity calculation | |
| weights = { | |
| 'lines_of_code': 0.0001, | |
| 'module_count': 0.5, | |
| 'always_blocks': 0.3, | |
| 'if_statements': 0.1, | |
| 'case_statements': 0.2, | |
| 'for_loops': 0.3, | |
| 'function_count': 0.2, | |
| 'task_count': 0.2, | |
| 'has_memory': 1.0, | |
| 'has_fsm': 0.8 | |
| } | |
| complexity = 0.0 | |
| for feature, weight in weights.items(): | |
| if feature in features: | |
| value = features[feature] | |
| if isinstance(value, bool): | |
| value = int(value) | |
| complexity += value * weight | |
| return round(complexity, 2) | |
| class DataPreprocessor: | |
| """Preprocess data for ML training""" | |
| def __init__(self): | |
| self.feature_columns = [ | |
| 'lines_of_code', 'module_count', 'signal_count', 'always_blocks', | |
| 'assign_statements', 'if_statements', 'case_statements', 'for_loops', | |
| 'function_count', 'task_count', 'clock_domains', 'reset_signals', | |
| 'interface_signals', 'memory_instances', 'fsm_count', 'pipeline_stages', | |
| 'arithmetic_units', 'complexity_score', 'has_memory', 'has_fsm', | |
| 'has_pipeline', 'has_floating_point', 'is_complex', 'is_large' | |
| ] | |
| def preprocess_for_ml(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Preprocess DataFrame for ML training""" | |
| processed_df = df.copy() | |
| # Fill missing values | |
| for col in self.feature_columns: | |
| if col in processed_df.columns: | |
| if processed_df[col].dtype == 'bool': | |
| processed_df[col] = processed_df[col].fillna(False) | |
| else: | |
| processed_df[col] = processed_df[col].fillna(processed_df[col].median()) | |
| # Convert boolean columns to int | |
| bool_columns = processed_df.select_dtypes(include=['bool']).columns | |
| processed_df[bool_columns] = processed_df[bool_columns].astype(int) | |
| # Remove outliers | |
| processed_df = self._remove_outliers(processed_df) | |
| return processed_df | |
| def _remove_outliers(self, df: pd.DataFrame, threshold: float = 3.0) -> pd.DataFrame: | |
| """Remove outliers using Z-score method""" | |
| numeric_columns = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| z_scores = np.abs((df[col] - df[col].mean()) / df[col].std()) | |
| df = df[z_scores < threshold] | |
| return df | |
| def create_risk_dashboard(analysis_results: Dict[str, Any]) -> go.Figure: | |
| """Create risk assessment dashboard visualization""" | |
| # Extract data | |
| risk_score = analysis_results.get('risk_score', 0) | |
| ml_analysis = analysis_results.get('ml_analysis', {}) | |
| bug_probability = ml_analysis.get('bug_probability', 0) if isinstance(ml_analysis, dict) else 0 | |
| complexity = analysis_results.get('complexity_score', analysis_results.get('complexity_estimate', 0)) | |
| # Create subplots | |
| fig = make_subplots( | |
| rows=2, cols=2, | |
| specs=[[{"type": "indicator"}, {"type": "indicator"}], | |
| [{"type": "bar"}, {"type": "scatter"}]], | |
| subplot_titles=("Overall Risk Score", "Bug Probability", | |
| "Risk Factors", "Complexity vs Risk") | |
| ) | |
| # Risk score gauge | |
| fig.add_trace( | |
| go.Indicator( | |
| mode="gauge+number+delta", | |
| value=risk_score * 100, | |
| domain={'x': [0, 1], 'y': [0, 1]}, | |
| title={'text': "Risk %"}, | |
| gauge={ | |
| 'axis': {'range': [None, 100]}, | |
| 'bar': {'color': "darkblue"}, | |
| 'steps': [ | |
| {'range': [0, 40], 'color': "lightgray"}, | |
| {'range': [40, 70], 'color': "yellow"}, | |
| {'range': [70, 100], 'color': "red"} | |
| ], | |
| 'threshold': { | |
| 'line': {'color': "red", 'width': 4}, | |
| 'thickness': 0.75, | |
| 'value': 90 | |
| } | |
| } | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Bug probability gauge | |
| fig.add_trace( | |
| go.Indicator( | |
| mode="gauge+number", | |
| value=bug_probability * 100, | |
| domain={'x': [0, 1], 'y': [0, 1]}, | |
| title={'text': "Bug Probability %"}, | |
| gauge={ | |
| 'axis': {'range': [None, 100]}, | |
| 'bar': {'color': "darkred"}, | |
| 'steps': [ | |
| {'range': [0, 30], 'color': "green"}, | |
| {'range': [30, 60], 'color': "yellow"}, | |
| {'range': [60, 100], 'color': "red"} | |
| ] | |
| } | |
| ), | |
| row=1, col=2 | |
| ) | |
| # Risk factors bar chart | |
| risk_factors = { | |
| 'Complexity': min(1.0, complexity / 10), | |
| 'Size': min(1.0, analysis_results.get('total_lines', 1000) / 20000), | |
| 'ML Prediction': bug_probability, | |
| 'Features': (int(analysis_results.get('has_memory', False)) + | |
| int(analysis_results.get('has_fsm', False))) * 0.5 | |
| } | |
| fig.add_trace( | |
| go.Bar( | |
| x=list(risk_factors.keys()), | |
| y=list(risk_factors.values()), | |
| marker_color=['blue', 'green', 'red', 'orange'], | |
| name="Risk Factors" | |
| ), | |
| row=2, col=1 | |
| ) | |
| # Complexity vs Risk scatter | |
| fig.add_trace( | |
| go.Scatter( | |
| x=[complexity], | |
| y=[risk_score], | |
| mode='markers', | |
| marker=dict(size=20, color='red', symbol='diamond'), | |
| name="Current Design", | |
| text=[f"Risk: {risk_score:.2f}<br>Complexity: {complexity:.2f}"], | |
| hovertemplate="%{text}<extra></extra>" | |
| ), | |
| row=2, col=2 | |
| ) | |
| # Add reference points | |
| ref_complexities = np.linspace(1, 10, 20) | |
| ref_risks = 0.1 + 0.7 * (ref_complexities / 10) + np.random.normal(0, 0.05, 20) | |
| ref_risks = np.clip(ref_risks, 0, 1) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=ref_complexities, | |
| y=ref_risks, | |
| mode='markers', | |
| marker=dict(size=8, color='lightblue', opacity=0.6), | |
| name="Reference Designs", | |
| hovertemplate="Complexity: %{x:.1f}<br>Risk: %{y:.2f}<extra></extra>" | |
| ), | |
| row=2, col=2 | |
| ) | |
| # Update layout | |
| fig.update_layout( | |
| title_text="Chip Design Risk Assessment Dashboard", | |
| title_x=0.5, | |
| showlegend=True, | |
| height=600 | |
| ) | |
| fig.update_xaxes(title_text="Risk Factor", row=2, col=1) | |
| fig.update_yaxes(title_text="Risk Level", row=2, col=1) | |
| fig.update_xaxes(title_text="Complexity Score", row=2, col=2) | |
| fig.update_yaxes(title_text="Risk Score", row=2, col=2) | |
| return fig | |
| def create_coverage_plot(coverage_data: Dict[str, Any]) -> go.Figure: | |
| """Create coverage analysis visualization""" | |
| coverage_types = ['Line', 'Branch', 'Toggle', 'Functional', 'Assertion'] | |
| coverage_values = [ | |
| coverage_data.get('line_coverage', 80), | |
| coverage_data.get('branch_coverage', 75), | |
| coverage_data.get('toggle_coverage', 70), | |
| coverage_data.get('functional_coverage', 85), | |
| coverage_data.get('assertion_coverage', 78) | |
| ] | |
| # Create radar chart for coverage | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatterpolar( | |
| r=coverage_values, | |
| theta=coverage_types, | |
| fill='toself', | |
| name='Current Coverage', | |
| line_color='blue' | |
| )) | |
| # Add target coverage | |
| target_coverage = [95, 90, 85, 95, 90] | |
| fig.add_trace(go.Scatterpolar( | |
| r=target_coverage, | |
| theta=coverage_types, | |
| fill=None, | |
| name='Target Coverage', | |
| line_color='red', | |
| line_dash='dash' | |
| )) | |
| fig.update_layout( | |
| polar=dict( | |
| radialaxis=dict( | |
| visible=True, | |
| range=[0, 100] | |
| ) | |
| ), | |
| showlegend=True, | |
| title="Coverage Analysis" | |
| ) | |
| return fig | |
| def calculate_verification_metrics(test_results: Dict[str, Any]) -> Dict[str, float]: | |
| """Calculate verification quality metrics""" | |
| metrics = { | |
| 'test_efficiency': 0.0, | |
| 'bug_detection_rate': 0.0, | |
| 'coverage_completeness': 0.0, | |
| 'verification_quality_score': 0.0 | |
| } | |
| try: | |
| # Test efficiency: coverage achieved per time unit | |
| coverage = test_results.get('coverage_achieved', 80) | |
| time_spent = test_results.get('verification_time_hours', 10) | |
| metrics['test_efficiency'] = coverage / max(1, time_spent) | |
| # Bug detection rate | |
| bugs_found = test_results.get('bugs_found', 0) | |
| total_tests = test_results.get('total_tests', 1) | |
| metrics['bug_detection_rate'] = bugs_found / max(1, total_tests) * 100 | |
| # Coverage completeness | |
| coverage_types = ['line_coverage', 'branch_coverage', 'functional_coverage'] | |
| coverage_scores = [test_results.get(ct, 0) for ct in coverage_types] | |
| metrics['coverage_completeness'] = sum(coverage_scores) / len(coverage_scores) | |
| # Overall verification quality score | |
| metrics['verification_quality_score'] = ( | |
| metrics['test_efficiency'] * 0.3 + | |
| metrics['coverage_completeness'] * 0.5 + | |
| (100 - metrics['bug_detection_rate']) * 0.2 | |
| ) | |
| except Exception as e: | |
| print(f"Error calculating metrics: {e}") | |
| return metrics |