Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import plotly.figure_factory as ff | |
| from typing import Dict, List, Any, Tuple | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Import scipy with error handling | |
| try: | |
| from scipy import stats | |
| from scipy.stats import chi2_contingency | |
| SCIPY_AVAILABLE = True | |
| except ImportError: | |
| SCIPY_AVAILABLE = False | |
| class EDAAnalyzer: | |
| """Comprehensive Exploratory Data Analysis with advanced visualizations""" | |
| def __init__(self): | |
| self.color_palette = [ | |
| '#FFD700', '#FF6B6B', '#4ECDC4', '#45B7D1', | |
| '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8' | |
| ] | |
| def perform_complete_eda(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Perform comprehensive EDA analysis""" | |
| try: | |
| results = { | |
| 'overview': self.generate_overview(df), | |
| 'distributions': self.analyze_distributions(df), | |
| 'correlations': self.analyze_correlations(df), | |
| 'insights': self.generate_insights(df), | |
| 'data_quality': self.assess_data_quality(df), | |
| 'advanced_analysis': self.perform_advanced_analysis(df) | |
| } | |
| return results | |
| except Exception as e: | |
| # Return basic results if advanced analysis fails | |
| return { | |
| 'overview': self.generate_overview(df), | |
| 'distributions': {}, | |
| 'correlations': {}, | |
| 'insights': [{'title': 'Analysis Error', 'description': f'Error during analysis: {str(e)}'}], | |
| 'data_quality': {}, | |
| 'advanced_analysis': {} | |
| } | |
| def generate_overview(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Generate dataset overview""" | |
| try: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| datetime_cols = df.select_dtypes(include=['datetime64']).columns | |
| overview = { | |
| 'total_rows': len(df), | |
| 'total_columns': len(df.columns), | |
| 'numeric_columns': len(numeric_cols), | |
| 'categorical_columns': len(categorical_cols), | |
| 'datetime_columns': len(datetime_cols), | |
| 'memory_usage': f"{df.memory_usage(deep=True).sum() / 1024**2:.2f} MB", | |
| 'duplicate_rows': df.duplicated().sum(), | |
| 'missing_values_total': df.isnull().sum().sum() | |
| } | |
| if len(numeric_cols) > 0: | |
| overview['summary_stats'] = df[numeric_cols].describe() | |
| return overview | |
| except Exception as e: | |
| return { | |
| 'total_rows': len(df) if df is not None else 0, | |
| 'total_columns': len(df.columns) if df is not None else 0, | |
| 'numeric_columns': 0, | |
| 'categorical_columns': 0, | |
| 'datetime_columns': 0, | |
| 'memory_usage': '0 MB', | |
| 'duplicate_rows': 0, | |
| 'missing_values_total': 0, | |
| 'error': str(e) | |
| } | |
| def analyze_distributions(self, df: pd.DataFrame) -> Dict[str, go.Figure]: | |
| """Analyze data distributions with multiple chart types""" | |
| distributions = {} | |
| try: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| # Numeric distributions | |
| if len(numeric_cols) > 0: | |
| distributions.update(self.create_numeric_distributions(df, numeric_cols)) | |
| # Categorical distributions | |
| if len(categorical_cols) > 0: | |
| distributions.update(self.create_categorical_distributions(df, categorical_cols)) | |
| except Exception as e: | |
| distributions['error'] = self.create_error_plot(f"Distribution analysis failed: {str(e)}") | |
| return distributions | |
| def create_error_plot(self, error_message: str) -> go.Figure: | |
| """Create an error plot when analysis fails""" | |
| fig = go.Figure() | |
| fig.add_annotation( | |
| text=error_message, | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, xanchor='center', yanchor='middle', | |
| showarrow=False, | |
| font=dict(size=16, color="red") | |
| ) | |
| fig.update_layout( | |
| title="Analysis Error", | |
| showlegend=False, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| return fig | |
| def create_numeric_distributions(self, df: pd.DataFrame, numeric_cols: List[str]) -> Dict[str, go.Figure]: | |
| """Create numeric distribution plots""" | |
| plots = {} | |
| try: | |
| # Multi-histogram plot | |
| if len(numeric_cols) <= 6: | |
| rows = (len(numeric_cols) + 2) // 3 | |
| fig = make_subplots( | |
| rows=rows, cols=3, | |
| subplot_titles=list(numeric_cols), | |
| vertical_spacing=0.08 | |
| ) | |
| for i, col in enumerate(numeric_cols): | |
| row = (i // 3) + 1 | |
| col_pos = (i % 3) + 1 | |
| # Filter out non-finite values | |
| data = df[col].dropna() | |
| if len(data) > 0: | |
| fig.add_trace( | |
| go.Histogram( | |
| x=data, | |
| name=col, | |
| marker_color=self.color_palette[i % len(self.color_palette)], | |
| opacity=0.7, | |
| showlegend=False | |
| ), | |
| row=row, col=col_pos | |
| ) | |
| fig.update_layout( | |
| title="π Numeric Distributions Overview", | |
| height=300 * rows, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| plots['numeric_histograms'] = fig | |
| # Box plots for outlier detection | |
| if len(numeric_cols) > 0: | |
| fig = go.Figure() | |
| for i, col in enumerate(numeric_cols[:8]): # Limit to 8 columns | |
| data = df[col].dropna() | |
| if len(data) > 0: | |
| fig.add_trace(go.Box( | |
| y=data, | |
| name=col, | |
| marker_color=self.color_palette[i % len(self.color_palette)] | |
| )) | |
| fig.update_layout( | |
| title="π¦ Box Plots - Outlier Detection", | |
| height=500, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| plots['box_plots'] = fig | |
| # Violin plots for distribution shapes | |
| if len(numeric_cols) > 0: | |
| fig = go.Figure() | |
| for i, col in enumerate(numeric_cols[:6]): | |
| data = df[col].dropna() | |
| if len(data) > 1: # Need at least 2 points for violin plot | |
| fig.add_trace(go.Violin( | |
| y=data, | |
| name=col, | |
| box_visible=True, | |
| meanline_visible=True, | |
| fillcolor=self.color_palette[i % len(self.color_palette)], | |
| opacity=0.6 | |
| )) | |
| fig.update_layout( | |
| title="π» Violin Plots - Distribution Shapes", | |
| height=500, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| plots['violin_plots'] = fig | |
| except Exception as e: | |
| plots['numeric_error'] = self.create_error_plot(f"Numeric distribution error: {str(e)}") | |
| return plots | |
| def create_categorical_distributions(self, df: pd.DataFrame, categorical_cols: List[str]) -> Dict[str, go.Figure]: | |
| """Create categorical distribution plots""" | |
| plots = {} | |
| try: | |
| # Bar charts for categorical variables | |
| for i, col in enumerate(categorical_cols[:4]): # Limit to 4 columns | |
| value_counts = df[col].value_counts().head(15) # Top 15 categories | |
| if len(value_counts) > 0: | |
| fig = go.Figure(data=[ | |
| go.Bar( | |
| x=value_counts.index.astype(str), | |
| y=value_counts.values, | |
| marker_color=self.color_palette[i % len(self.color_palette)], | |
| text=value_counts.values, | |
| textposition='auto' | |
| ) | |
| ]) | |
| fig.update_layout( | |
| title=f"π {col} - Value Distribution", | |
| xaxis_title=col, | |
| yaxis_title="Count", | |
| height=400, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| plots[f'categorical_{col}'] = fig | |
| # Pie chart for first categorical variable | |
| if len(categorical_cols) > 0: | |
| col = categorical_cols[0] | |
| value_counts = df[col].value_counts().head(10) | |
| if len(value_counts) > 0: | |
| fig = go.Figure(data=[go.Pie( | |
| labels=value_counts.index.astype(str), | |
| values=value_counts.values, | |
| hole=0.3, | |
| marker_colors=self.color_palette | |
| )]) | |
| fig.update_layout( | |
| title=f"π₯§ {col} - Proportion Analysis", | |
| height=500, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| plots['pie_chart'] = fig | |
| except Exception as e: | |
| plots['categorical_error'] = self.create_error_plot(f"Categorical distribution error: {str(e)}") | |
| return plots | |
| def analyze_correlations(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze correlations between variables""" | |
| correlations = {} | |
| try: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 1: | |
| # Correlation matrix | |
| corr_matrix = df[numeric_cols].corr() | |
| # Heatmap | |
| fig = go.Figure(data=go.Heatmap( | |
| z=corr_matrix.values, | |
| x=corr_matrix.columns, | |
| y=corr_matrix.columns, | |
| colorscale='RdYlBu', | |
| zmid=0, | |
| text=np.round(corr_matrix.values, 2), | |
| texttemplate="%{text}", | |
| textfont={"size": 10}, | |
| colorbar=dict(title="Correlation") | |
| )) | |
| fig.update_layout( | |
| title="π₯ Correlation Heatmap", | |
| height=max(400, len(numeric_cols) * 30), | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| correlations['heatmap'] = fig | |
| # Top correlations | |
| mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) | |
| corr_matrix_masked = corr_matrix.mask(mask) | |
| # Get top positive and negative correlations | |
| corr_pairs = [] | |
| for i in range(len(corr_matrix_masked.columns)): | |
| for j in range(len(corr_matrix_masked.columns)): | |
| if pd.notna(corr_matrix_masked.iloc[i, j]): | |
| corr_pairs.append({ | |
| 'Variable 1': corr_matrix_masked.columns[i], | |
| 'Variable 2': corr_matrix_masked.columns[j], | |
| 'Correlation': corr_matrix_masked.iloc[i, j] | |
| }) | |
| if corr_pairs: | |
| corr_df = pd.DataFrame(corr_pairs) | |
| corr_df = corr_df.reindex(corr_df['Correlation'].abs().sort_values(ascending=False).index) | |
| correlations['top_correlations'] = corr_df.head(10) | |
| # Scatter plot matrix for top correlated variables | |
| if len(numeric_cols) >= 2: | |
| top_corr_cols = corr_df.head(3)[['Variable 1', 'Variable 2']].values.flatten() | |
| unique_cols = list(set(top_corr_cols))[:4] # Max 4 variables | |
| if len(unique_cols) >= 2: | |
| try: | |
| fig = px.scatter_matrix( | |
| df[unique_cols].dropna(), | |
| dimensions=unique_cols, | |
| color_discrete_sequence=self.color_palette | |
| ) | |
| fig.update_layout( | |
| title="π― Scatter Plot Matrix - Top Correlated Variables", | |
| height=600, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| correlations['scatter_matrix'] = fig | |
| except Exception: | |
| pass # Skip if scatter matrix fails | |
| except Exception as e: | |
| correlations['error'] = f"Correlation analysis failed: {str(e)}" | |
| return correlations | |
| def generate_insights(self, df: pd.DataFrame) -> List[Dict[str, str]]: | |
| """Generate AI-powered insights about the data""" | |
| insights = [] | |
| try: | |
| # Basic statistics insights | |
| insights.append({ | |
| 'title': 'π Dataset Overview', | |
| 'description': f"Dataset contains {len(df):,} rows and {len(df.columns)} columns. " | |
| f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB. " | |
| f"Missing values: {df.isnull().sum().sum():,} ({df.isnull().sum().sum() / df.size * 100:.1f}%)." | |
| }) | |
| # Numeric columns insights | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| try: | |
| # Find columns with high variance | |
| variances = df[numeric_cols].var().sort_values(ascending=False) | |
| high_var_col = variances.index[0] | |
| insights.append({ | |
| 'title': 'π Variance Analysis', | |
| 'description': f"'{high_var_col}' shows the highest variance ({variances.iloc[0]:.2f}), " | |
| f"indicating significant spread in values. This column might contain outliers " | |
| f"or represent a key differentiating factor in your dataset." | |
| }) | |
| # Skewness analysis | |
| skewed_cols = [] | |
| for col in numeric_cols: | |
| try: | |
| skewness = df[col].skew() | |
| if abs(skewness) > 1: | |
| skewed_cols.append((col, skewness)) | |
| except: | |
| continue | |
| if skewed_cols: | |
| insights.append({ | |
| 'title': 'π Distribution Skewness', | |
| 'description': f"Found {len(skewed_cols)} heavily skewed columns. " | |
| f"Most skewed: '{skewed_cols[0][0]}' (skewness: {skewed_cols[0][1]:.2f}). " | |
| f"Consider log transformation or outlier treatment for better modeling." | |
| }) | |
| except Exception: | |
| pass | |
| # Categorical insights | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| if len(categorical_cols) > 0: | |
| try: | |
| cardinalities = [] | |
| for col in categorical_cols: | |
| unique_count = df[col].nunique() | |
| cardinalities.append((col, unique_count)) | |
| cardinalities.sort(key=lambda x: x[1], reverse=True) | |
| insights.append({ | |
| 'title': 'π·οΈ Categorical Analysis', | |
| 'description': f"'{cardinalities[0][0]}' has the highest cardinality ({cardinalities[0][1]} unique values). " | |
| f"High cardinality columns might need encoding strategies for machine learning. " | |
| f"Consider grouping rare categories or using embedding techniques." | |
| }) | |
| except Exception: | |
| pass | |
| # Missing data patterns | |
| try: | |
| missing_data = df.isnull().sum() | |
| missing_cols = missing_data[missing_data > 0].sort_values(ascending=False) | |
| if len(missing_cols) > 0: | |
| insights.append({ | |
| 'title': 'β Missing Data Patterns', | |
| 'description': f"'{missing_cols.index[0]}' has the most missing values ({missing_cols.iloc[0]:,} - " | |
| f"{missing_cols.iloc[0] / len(df) * 100:.1f}%). " | |
| f"Analyze if missing data is random or systematic. " | |
| f"Consider imputation strategies or feature engineering." | |
| }) | |
| except Exception: | |
| pass | |
| # Correlation insights | |
| if len(numeric_cols) > 1: | |
| try: | |
| corr_matrix = df[numeric_cols].corr() | |
| mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) | |
| corr_matrix_masked = corr_matrix.mask(mask) | |
| max_corr = 0 | |
| max_pair = None | |
| for i in range(len(corr_matrix_masked.columns)): | |
| for j in range(len(corr_matrix_masked.columns)): | |
| if pd.notna(corr_matrix_masked.iloc[i, j]): | |
| if abs(corr_matrix_masked.iloc[i, j]) > abs(max_corr): | |
| max_corr = corr_matrix_masked.iloc[i, j] | |
| max_pair = (corr_matrix_masked.columns[i], corr_matrix_masked.columns[j]) | |
| if max_pair and abs(max_corr) > 0.5: | |
| insights.append({ | |
| 'title': 'π Strong Correlations', | |
| 'description': f"Strong correlation found between '{max_pair[0]}' and '{max_pair[1]}' " | |
| f"(r = {max_corr:.3f}). This suggests potential multicollinearity. " | |
| f"Consider feature selection or dimensionality reduction techniques." | |
| }) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| insights.append({ | |
| 'title': 'Analysis Error', | |
| 'description': f"Error generating insights: {str(e)}" | |
| }) | |
| return insights | |
| def assess_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Assess data quality with visualizations""" | |
| quality = {} | |
| try: | |
| # Missing values heatmap | |
| if df.isnull().sum().sum() > 0: | |
| missing_data = df.isnull().sum().sort_values(ascending=False) | |
| missing_data = missing_data[missing_data > 0] | |
| if len(missing_data) > 0: | |
| fig = go.Figure([go.Bar( | |
| x=missing_data.index, | |
| y=missing_data.values, | |
| marker_color='#FF6B6B', | |
| text=missing_data.values, | |
| textposition='auto' | |
| )]) | |
| fig.update_layout( | |
| title="β Missing Values by Column", | |
| xaxis_title="Columns", | |
| yaxis_title="Missing Count", | |
| height=400, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| quality['missing_values'] = fig | |
| # Data types distribution | |
| dtype_counts = df.dtypes.value_counts() | |
| if len(dtype_counts) > 0: | |
| fig = go.Figure(data=[go.Pie( | |
| labels=[str(dtype) for dtype in dtype_counts.index], | |
| values=dtype_counts.values, | |
| hole=0.3, | |
| marker_colors=self.color_palette | |
| )]) | |
| fig.update_layout( | |
| title="π§ Data Types Distribution", | |
| height=400, | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| font=dict(color='white') | |
| ) | |
| quality['data_types'] = fig | |
| # Duplicate analysis | |
| duplicates = df.duplicated().sum() | |
| if duplicates > 0: | |
| quality['duplicates'] = { | |
| 'count': duplicates, | |
| 'percentage': duplicates / len(df) * 100 | |
| } | |
| except Exception as e: | |
| quality['error'] = f"Data quality assessment failed: {str(e)}" | |
| return quality | |
| def perform_advanced_analysis(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Perform advanced statistical analysis""" | |
| advanced = {} | |
| try: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| # Outlier detection using IQR method | |
| if len(numeric_cols) > 0: | |
| outlier_counts = {} | |
| for col in numeric_cols: | |
| try: | |
| data = df[col].dropna() | |
| if len(data) > 0: | |
| Q1 = data.quantile(0.25) | |
| Q3 = data.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)] | |
| outlier_counts[col] = len(outliers) | |
| except Exception: | |
| outlier_counts[col] = 0 | |
| if outlier_counts: | |
| outlier_df = pd.DataFrame(list(outlier_counts.items()), | |
| columns=['Column', 'Outlier_Count']) | |
| outlier_df = outlier_df.sort_values('Outlier_Count', ascending=False) | |
| advanced['outliers'] = outlier_df | |
| # Statistical tests | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| if len(categorical_cols) >= 2 and SCIPY_AVAILABLE: | |
| try: | |
| col1, col2 = categorical_cols[0], categorical_cols[1] | |
| contingency_table = pd.crosstab(df[col1], df[col2]) | |
| if contingency_table.shape[0] > 1 and contingency_table.shape[1] > 1: | |
| chi2, p_value, dof, expected = chi2_contingency(contingency_table) | |
| advanced['chi_square_test'] = { | |
| 'variables': [col1, col2], | |
| 'chi2_statistic': chi2, | |
| 'p_value': p_value, | |
| 'interpretation': 'Dependent' if p_value < 0.05 else 'Independent' | |
| } | |
| except Exception: | |
| pass # Skip if test fails | |
| except Exception as e: | |
| advanced['error'] = f"Advanced analysis failed: {str(e)}" | |
| return advanced |