Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import io | |
| import base64 | |
| import math | |
| import logging | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| # Function to safely parse JSON input | |
| def parse_input(json_input): | |
| logger.debug("Attempting to parse input: %s", json_input) | |
| try: | |
| data = json.loads(json_input) | |
| logger.debug("Successfully parsed as JSON") | |
| return data | |
| except json.JSONDecodeError as e: | |
| logger.error("JSON parsing failed: %s (Input: %s)", str(e), json_input[:100] + "..." if len(json_input) > 100 else json_input) | |
| raise ValueError(f"Malformed JSON: {str(e)}. Use double quotes for property names (e.g., \"content\") and ensure valid JSON format.") | |
| # Function to ensure a value is a float | |
| def ensure_float(value): | |
| if value is None: | |
| logger.debug("Replacing None logprob with 0.0") | |
| return 0.0 # Default to 0.0 for None | |
| if isinstance(value, str): | |
| try: | |
| return float(value) | |
| except ValueError: | |
| logger.error("Failed to convert string '%s' to float", value) | |
| return 0.0 # Default to 0.0 for invalid strings | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| return 0.0 # Default for any other type | |
| # Function to get or generate a token value (default to "Unknown" if missing) | |
| def get_token(entry): | |
| token = entry.get("token", "Unknown") | |
| if token == "Unknown": | |
| logger.warning("Missing 'token' key for entry: %s, using 'Unknown'", entry) | |
| return token | |
| # Function to create an empty Plotly figure | |
| def create_empty_figure(title): | |
| return go.Figure().update_layout(title=title, xaxis_title="", yaxis_title="", showlegend=False) | |
| # Precompute the next chunk (synchronous for Hugging Face Spaces) | |
| def precompute_chunk(json_input, chunk_size, current_chunk): | |
| try: | |
| data = parse_input(json_input) | |
| content = data.get("content", []) if isinstance(data, dict) else data | |
| if not isinstance(content, list): | |
| raise ValueError("Content must be a list of entries") | |
| tokens = [] | |
| logprobs = [] | |
| top_alternatives = [] | |
| for entry in content: | |
| if not isinstance(entry, dict): | |
| logger.warning("Skipping non-dictionary entry: %s", entry) | |
| continue | |
| logprob = ensure_float(entry.get("logprob", None)) | |
| if logprob >= -100000: # Include all entries with default 0.0 | |
| tokens.append(get_token(entry)) | |
| logprobs.append(logprob) | |
| top_probs = entry.get("top_logprobs", {}) or {} | |
| finite_top_probs = [] | |
| for key, value in top_probs.items(): | |
| float_value = ensure_float(value) | |
| if float_value is not None and math.isfinite(float_value): | |
| finite_top_probs.append((key, float_value)) | |
| sorted_probs = sorted(finite_top_probs, key=lambda x: x[1], reverse=True) | |
| top_alternatives.append(sorted_probs) | |
| if not tokens or not logprobs: | |
| return None, None, None | |
| next_chunk = current_chunk + 1 | |
| start_idx = next_chunk * chunk_size | |
| end_idx = min((next_chunk + 1) * chunk_size, len(tokens)) | |
| if start_idx >= len(tokens): | |
| return None, None, None | |
| return tokens[start_idx:end_idx], logprobs[start_idx:end_idx], top_alternatives[start_idx:end_idx] | |
| except Exception as e: | |
| logger.error("Precomputation failed for chunk %d: %s", current_chunk + 1, str(e)) | |
| return None, None, None | |
| # Function to process and visualize a chunk of log probs with dynamic top_logprobs | |
| def visualize_logprobs(json_input, chunk=0, chunk_size=100): | |
| try: | |
| data = parse_input(json_input) | |
| content = data.get("content", []) if isinstance(data, dict) else data | |
| if not isinstance(content, list): | |
| raise ValueError("Content must be a list of entries") | |
| tokens = [] | |
| logprobs = [] | |
| top_alternatives = [] # List to store all top_logprobs (dynamic length) | |
| for entry in content: | |
| if not isinstance(entry, dict): | |
| logger.warning("Skipping non-dictionary entry: %s", entry) | |
| continue | |
| logprob = ensure_float(entry.get("logprob", None)) | |
| if logprob >= -100000: # Include all entries with default 0.0 | |
| tokens.append(get_token(entry)) | |
| logprobs.append(logprob) | |
| top_probs = entry.get("top_logprobs", {}) or {} | |
| finite_top_probs = [] | |
| for key, value in top_probs.items(): | |
| float_value = ensure_float(value) | |
| if float_value is not None and math.isfinite(float_value): | |
| finite_top_probs.append((key, float_value)) | |
| sorted_probs = sorted(finite_top_probs, key=lambda x: x[1], reverse=True) | |
| top_alternatives.append(sorted_probs) | |
| if not logprobs or not tokens: | |
| return (create_empty_figure("Log Probabilities of Generated Tokens"), None, "No tokens to display.", create_empty_figure("Top Token Log Probabilities"), create_empty_figure("Significant Probability Drops"), 1, 0) | |
| total_chunks = max(1, (len(logprobs) + chunk_size - 1) // chunk_size) | |
| start_idx = chunk * chunk_size | |
| end_idx = min((chunk + 1) * chunk_size, len(logprobs)) | |
| paginated_tokens = tokens[start_idx:end_idx] | |
| paginated_logprobs = logprobs[start_idx:end_idx] | |
| paginated_alternatives = top_alternatives[start_idx:end_idx] if top_alternatives else [] | |
| # Main Log Probability Plot (Interactive Plotly) | |
| main_fig = go.Figure() | |
| main_fig.add_trace(go.Scatter(x=list(range(len(paginated_logprobs))), y=paginated_logprobs, mode='markers+lines', name='Log Prob', marker=dict(color='blue'))) | |
| main_fig.update_layout( | |
| title=f"Log Probabilities of Generated Tokens (Chunk {chunk + 1})", | |
| xaxis_title="Token Position (within chunk)", | |
| yaxis_title="Log Probability", | |
| hovermode="closest", | |
| clickmode='event+select' | |
| ) | |
| main_fig.update_traces( | |
| customdata=[f"Token: {tok}, Log Prob: {prob:.4f}, Position: {i+start_idx}" for i, (tok, prob) in enumerate(zip(paginated_tokens, paginated_logprobs))], | |
| hovertemplate='<b>%{customdata}</b><extra></extra>' | |
| ) | |
| # Probability Drop Analysis (Interactive Plotly) | |
| if len(paginated_logprobs) < 2: | |
| drops_fig = create_empty_figure(f"Significant Probability Drops (Chunk {chunk + 1})") | |
| else: | |
| drops = [paginated_logprobs[i+1] - paginated_logprobs[i] for i in range(len(paginated_logprobs)-1)] | |
| drops_fig = go.Figure() | |
| drops_fig.add_trace(go.Bar(x=list(range(len(drops))), y=drops, name='Drop', marker_color='red')) | |
| drops_fig.update_layout( | |
| title=f"Significant Probability Drops (Chunk {chunk + 1})", | |
| xaxis_title="Token Position (within chunk)", | |
| yaxis_title="Log Probability Drop", | |
| hovermode="closest", | |
| clickmode='event+select' | |
| ) | |
| drops_fig.update_traces( | |
| customdata=[f"Drop: {drop:.4f}, From: {paginated_tokens[i]} to {paginated_tokens[i+1]}, Position: {i+start_idx}" for i, drop in enumerate(drops)], | |
| hovertemplate='<b>%{customdata}</b><extra></extra>' | |
| ) | |
| # Create DataFrame for the table with dynamic top_logprobs | |
| table_data = [] | |
| max_alternatives = max(len(alts) for alts in paginated_alternatives) if paginated_alternatives else 0 | |
| for i, entry in enumerate(content[start_idx:end_idx]): | |
| if not isinstance(entry, dict): | |
| continue | |
| logprob = ensure_float(entry.get("logprob", None)) | |
| if logprob >= -100000 and "top_logprobs" in entry: | |
| token = get_token(entry) | |
| top_logprobs = entry.get("top_logprobs", {}) or {} | |
| finite_top_probs = [] | |
| for key, value in top_logprobs.items(): | |
| float_value = ensure_float(value) | |
| if float_value is not None and math.isfinite(float_value): | |
| finite_top_probs.append((key, float_value)) | |
| sorted_probs = sorted(finite_top_probs, key=lambda x: x[1], reverse=True) | |
| row = [token, f"{logprob:.4f}"] | |
| for alt_token, alt_logprob in sorted_probs[:max_alternatives]: | |
| row.append(f"{alt_token}: {alt_logprob:.4f}") | |
| while len(row) < 2 + max_alternatives: | |
| row.append("") | |
| table_data.append(row) | |
| df = pd.DataFrame(table_data, columns=["Token", "Log Prob"] + [f"Alt {i+1}" for i in range(max_alternatives)]) if table_data else None | |
| # Generate colored text (for the current chunk) | |
| if paginated_logprobs: | |
| min_logprob = min(paginated_logprobs) | |
| max_logprob = max(paginated_logprobs) | |
| normalized_probs = [0.5] * len(paginated_logprobs) if max_logprob == min_logprob else \ | |
| [(lp - min_logprob) / (max_logprob - min_logprob) for lp in paginated_logprobs] | |
| colored_text = "" | |
| for i, (token, norm_prob) in enumerate(zip(paginated_tokens, normalized_probs)): | |
| r = int(255 * (1 - norm_prob)) # Red for low confidence | |
| g = int(255 * norm_prob) # Green for high confidence | |
| b = 0 | |
| color = f"rgb({r}, {g}, {b})" | |
| colored_text += f'<span style="color: {color}; font-weight: bold;">{token}</span>' | |
| if i < len(paginated_tokens) - 1: | |
| colored_text += " " | |
| colored_text_html = f"<p>{colored_text}</p>" | |
| else: | |
| colored_text_html = "No tokens to display in this chunk." | |
| # Top Token Log Probabilities (Interactive Plotly, dynamic length, for the current chunk) | |
| alt_viz_fig = create_empty_figure(f"Top Token Log Probabilities (Chunk {chunk + 1})") if not paginated_alternatives else go.Figure() | |
| if paginated_alternatives: | |
| for i, (token, probs) in enumerate(zip(paginated_tokens, paginated_alternatives)): | |
| for j, (alt_tok, prob) in enumerate(probs): | |
| alt_viz_fig.add_trace(go.Bar(x=[f"{token} (Pos {i+start_idx})"], y=[prob], name=f"{alt_tok}", marker_color=['blue', 'green', 'red', 'purple', 'orange'][:len(probs)])) | |
| alt_viz_fig.update_layout( | |
| title=f"Top Token Log Probabilities (Chunk {chunk + 1})", | |
| xaxis_title="Token (Position)", | |
| yaxis_title="Log Probability", | |
| barmode='stack', | |
| hovermode="closest", | |
| clickmode='event+select' | |
| ) | |
| alt_viz_fig.update_traces( | |
| customdata=[f"Token: {tok}, Alt: {alt}, Log Prob: {prob:.4f}, Position: {i+start_idx}" for i, (tok, alts) in enumerate(zip(paginated_tokens, paginated_alternatives)) for alt, prob in alts], | |
| hovertemplate='<b>%{customdata}</b><extra></extra>' | |
| ) | |
| return (main_fig, df, colored_text_html, alt_viz_fig, drops_fig, total_chunks, chunk) | |
| except Exception as e: | |
| logger.error("Visualization failed: %s", str(e)) | |
| return (create_empty_figure("Log Probabilities of Generated Tokens"), None, f"Error: {e}", create_empty_figure("Top Token Log Probabilities"), create_empty_figure("Significant Probability Drops"), 1, 0) | |
| # Analysis functions for detecting correct vs. incorrect traces | |
| def analyze_confidence_signature(logprobs, tokens): | |
| if not logprobs or not tokens: | |
| return "No data for confidence signature analysis.", None | |
| # Extract top probabilities | |
| top_probs = [lps[0][1] if lps and lps[0][1] is not None else -float('inf') for lps in logprobs] | |
| if not any(p != -float('inf') for p in top_probs): | |
| return "No valid log probabilities for confidence analysis.", None | |
| # Use a larger window for smoother trends | |
| window_size = 30 # Increased from 20 | |
| moving_avg = np.convolve(top_probs, np.ones(window_size) / window_size, mode='valid') | |
| # Calculate drop magnitudes | |
| drops = np.diff(moving_avg) | |
| # Use adaptive thresholding - only flag drops in the bottom 5% of all changes | |
| drop_threshold = np.percentile(drops, 5) # More selective | |
| significant_drops = np.where(drops < drop_threshold)[0] | |
| # Cluster nearby drops (within 10 tokens) to avoid reporting multiple points in the same reasoning shift | |
| if len(significant_drops) > 0: | |
| clustered_drops = [significant_drops[0]] | |
| for drop in significant_drops[1:]: | |
| if drop - clustered_drops[-1] > 10: # At least 10 tokens apart | |
| clustered_drops.append(drop) | |
| else: | |
| clustered_drops = [] | |
| # Look for context markers near drops | |
| filtered_drops = [] | |
| reasoning_markers = ["therefore", "thus", "so", "hence", "wait", "but", "however", "actually"] | |
| for drop in clustered_drops: | |
| # Adjust index for convolution window | |
| token_idx = drop + window_size - 1 | |
| # Check surrounding context (10 tokens before and after) | |
| start_idx = max(0, token_idx - 10) | |
| end_idx = min(len(tokens), token_idx + 10) | |
| context = " ".join(tokens[start_idx:end_idx]) | |
| # Only keep drops near reasoning transition markers | |
| if any(marker in context.lower() for marker in reasoning_markers): | |
| drop_magnitude = drops[drop] | |
| filtered_drops.append((token_idx, drop_magnitude, tokens[token_idx] if token_idx < len(tokens) else "End of trace")) | |
| # Sort by drop magnitude (largest drops first) | |
| filtered_drops.sort(key=lambda x: x[1]) | |
| if not filtered_drops: | |
| return "No significant confidence shifts at reasoning transitions detected.", None | |
| # Return at most 3 most significant drops as the data | |
| return "Significant confidence shifts detected at reasoning transitions:", filtered_drops[:3] | |
| def detect_interpretation_pivots(logprobs, tokens): | |
| if not logprobs or not tokens: | |
| return "No data for interpretation pivot detection.", None | |
| pivots = [] | |
| reconsideration_tokens = ["wait", "but", "actually", "however", "hmm"] | |
| for i, (token, lps) in enumerate(zip(tokens, logprobs)): | |
| if not lps: | |
| continue | |
| for rt in reconsideration_tokens: | |
| for t, p in lps: | |
| if t.lower() == rt and p > -2.5: # High probability | |
| context = tokens[max(0, i-50):i] | |
| pivots.append((i, rt, context)) | |
| if not pivots: | |
| return "No interpretation pivots detected.", None | |
| return "Interpretation pivots detected:", pivots | |
| def calculate_decision_entropy(logprobs): | |
| if not logprobs: | |
| return "No data for entropy spike detection.", None | |
| entropies = [] | |
| for lps in logprobs: | |
| if not lps: | |
| entropies.append(0.0) | |
| continue | |
| probs = [math.exp(p) for _, p in lps if p is not None] # Convert log probs to probabilities, handle None | |
| if not probs or sum(probs) == 0: | |
| entropies.append(0.0) | |
| continue | |
| entropy = -sum(p * math.log(p) for p in probs if p > 0) | |
| entropies.append(entropy) | |
| baseline = np.percentile(entropies, 75) if entropies else 0.0 | |
| spikes = [i for i, e in enumerate(entropies) if e > baseline * 1.5 and baseline > 0] | |
| if not spikes: | |
| return "No entropy spikes detected at decision points.", None | |
| return "Entropy spikes detected at positions:", spikes | |
| def analyze_conclusion_competition(logprobs, tokens): | |
| if not logprobs or not tokens: | |
| return "No data for conclusion competition analysis.", None | |
| conclusion_indices = [i for i, t in enumerate(tokens) if any(marker in t.lower() for marker in ["therefore", "thus", "boxed", "answer"])] | |
| if not conclusion_indices: | |
| return "No conclusion markers found in trace.", None | |
| gaps = [] | |
| conclusion_idx = conclusion_indices[-1] | |
| end_range = min(conclusion_idx + 50, len(logprobs)) | |
| for idx in range(conclusion_idx, end_range): | |
| if idx < len(logprobs) and len(logprobs[idx]) >= 2 and logprobs[idx][0][1] is not None and logprobs[idx][1][1] is not None: | |
| gap = logprobs[idx][0][1] - logprobs[idx][1][1] | |
| gaps.append(gap) | |
| if not gaps: | |
| return "No conclusion competition data available.", None | |
| mean_gap = np.mean(gaps) | |
| return f"Mean probability gap at conclusion: {mean_gap:.4f} (higher indicates more confident conclusion)", None | |
| def analyze_verification_signals(logprobs, tokens): | |
| if not logprobs or not tokens: | |
| return "No data for verification signal analysis.", None | |
| verification_terms = ["verify", "check", "confirm", "ensure", "double"] | |
| verification_probs = [] | |
| for lps in logprobs: | |
| if not lps: | |
| continue | |
| max_v_prob = -float('inf') | |
| for token, prob in lps: | |
| if any(v_term in token.lower() for v_term in verification_terms) and prob is not None: | |
| max_v_prob = max(max_v_prob, prob) | |
| if max_v_prob > -float('inf'): | |
| verification_probs.append(max_v_prob) | |
| if not verification_probs: | |
| return "No verification signals detected.", None | |
| count, mean_prob = len(verification_probs), np.mean(verification_probs) | |
| return f"Verification signals found: {count} instances, mean probability: {mean_prob:.4f}", None | |
| def detect_semantic_inversions(logprobs, tokens): | |
| if not logprobs or not tokens: | |
| return "No data for semantic inversion detection.", None | |
| inversion_pairs = [("more", "less"), ("larger", "smaller"), ("winning", "losing"), ("increase", "decrease"), ("greater", "lesser"), ("positive", "negative")] | |
| inversions = [] | |
| for i, (token, lps) in enumerate(zip(tokens, logprobs)): | |
| if not lps: | |
| continue | |
| for pos, neg in inversion_pairs: | |
| if token.lower() == pos: | |
| for t, p in lps: | |
| if t.lower() == neg and p > -3.0 and p is not None: | |
| inversions.append((i, pos, neg, p)) | |
| elif token.lower() == neg: | |
| for t, p in lps: | |
| if t.lower() == pos and p > -3.0 and p is not None: | |
| inversions.append((i, neg, pos, p)) | |
| if not inversions: | |
| return "No semantic inversions detected.", None | |
| return "Semantic inversions detected:", inversions | |
| # Function to perform full trace analysis | |
| def analyze_full_trace(json_input): | |
| try: | |
| data = parse_input(json_input) | |
| content = data.get("content", []) if isinstance(data, dict) else data | |
| if not isinstance(content, list): | |
| raise ValueError("Content must be a list of entries") | |
| tokens = [] | |
| logprobs = [] | |
| for entry in content: | |
| if not isinstance(entry, dict): | |
| logger.warning("Skipping non-dictionary entry: %s", entry) | |
| continue | |
| logprob = ensure_float(entry.get("logprob", None)) | |
| if logprob >= -100000: | |
| tokens.append(get_token(entry)) | |
| top_probs = entry.get("top_logprobs", {}) or {} | |
| finite_top_probs = [(key, ensure_float(value)) for key, value in top_probs.items() if ensure_float(value) is not None and math.isfinite(ensure_float(value))] | |
| logprobs.append(finite_top_probs) | |
| if not logprobs or not tokens: | |
| return "No valid data for trace analysis.", None, None, None, None, None | |
| confidence_result, confidence_data = analyze_confidence_signature(logprobs, tokens) | |
| pivot_result, pivot_data = detect_interpretation_pivots(logprobs, tokens) | |
| entropy_result, entropy_data = calculate_decision_entropy(logprobs) | |
| conclusion_result, conclusion_data = analyze_conclusion_competition(logprobs, tokens) | |
| verification_result, verification_data = analyze_verification_signals(logprobs, tokens) | |
| inversion_result, inversion_data = detect_semantic_inversions(logprobs, tokens) | |
| analysis_html = f""" | |
| <h3>Trace Analysis Results</h3> | |
| <ul> | |
| <li><strong>Confidence Signature:</strong> {confidence_result}</li> | |
| {f"<ul><li>Positions: {', '.join(str(pos) for pos, tok in confidence_data)}</li></ul>" if confidence_data else ""} | |
| <li><strong>Interpretation Pivots:</strong> {pivot_result}</li> | |
| {f"<ul><li>Positions: {', '.join(str(pos) for pos, _, _ in pivot_data)}</li></ul>" if pivot_data else ""} | |
| <li><strong>Decision Entropy Spikes:</strong> {entropy_result}</li> | |
| {f"<ul><li>Positions: {', '.join(str(pos) for pos in entropy_data)}</li></ul>" if entropy_data else ""} | |
| <li><strong>Conclusion Competition:</strong> {conclusion_result}</li> | |
| <li><strong>Verification Signals:</strong> {verification_result}</li> | |
| <li><strong>Semantic Inversions:</strong> {inversion_result}</li> | |
| {f"<ul><li>Positions: {', '.join(str(pos) for pos, _, _, _ in inversion_data)}</li></ul>" if inversion_data else ""} | |
| </ul> | |
| """ | |
| return analysis_html, None, None, None, None, None | |
| except Exception as e: | |
| logger.error("Trace analysis failed: %s", str(e)) | |
| return f"Error: {e}", None, None, None, None, None | |
| # Gradio interface with two tabs | |
| try: | |
| with gr.Blocks(title="Log Probability Visualizer") as app: | |
| gr.Markdown("# Log Probability Visualizer") | |
| gr.Markdown("Paste your JSON log prob data below to analyze reasoning traces or visualize tokens in chunks of 100. Fixed filter ≥ -100000, dynamic number of top_logprobs, handles missing or null fields.") | |
| with gr.Tabs(): | |
| with gr.Tab("Trace Analysis"): | |
| with gr.Row(): | |
| json_input_analysis = gr.Textbox( | |
| label="JSON Input for Trace Analysis", | |
| lines=10, | |
| placeholder='{"content": [{"bytes": [44], "logprob": 0.0, "token": ",", "top_logprobs": {" so": -13.8046875, ".": -13.8046875, ",": -13.640625}}]}' | |
| ) | |
| with gr.Row(): | |
| analysis_output = gr.HTML(label="Trace Analysis Results") | |
| btn_analyze = gr.Button("Analyze Trace") | |
| btn_analyze.click( | |
| fn=analyze_full_trace, | |
| inputs=[json_input_analysis], | |
| outputs=[analysis_output, gr.State(), gr.State(), gr.State(), gr.State(), gr.State()], | |
| ) | |
| with gr.Tab("Visualization"): | |
| with gr.Row(): | |
| json_input_viz = gr.Textbox( | |
| label="JSON Input for Visualization", | |
| lines=10, | |
| placeholder='{"content": [{"bytes": [44], "logprob": 0.0, "token": ",", "top_logprobs": {" so": -13.8046875, ".": -13.8046875, ",": -13.640625}}]}' | |
| ) | |
| chunk = gr.Number(value=0, label="Current Chunk", precision=0, minimum=0) | |
| with gr.Row(): | |
| plot_output = gr.Plot(label="Log Probability Plot (Click for Tokens)") | |
| drops_output = gr.Plot(label="Probability Drops (Click for Details)") | |
| with gr.Row(): | |
| table_output = gr.Dataframe(label="Token Log Probabilities and Top Alternatives") | |
| alt_viz_output = gr.Plot(label="Top Token Log Probabilities (Click for Details)") | |
| with gr.Row(): | |
| text_output = gr.HTML(label="Colored Text (Confidence Visualization)") | |
| with gr.Row(): | |
| prev_btn = gr.Button("Previous Chunk") | |
| next_btn = gr.Button("Next Chunk") | |
| total_chunks_output = gr.Number(label="Total Chunks", interactive=False) | |
| # Precomputed next chunk state (hidden) | |
| precomputed_next = gr.State(value=None) | |
| btn_viz = gr.Button("Visualize") | |
| btn_viz.click( | |
| fn=visualize_logprobs, | |
| inputs=[json_input_viz, chunk], | |
| outputs=[plot_output, table_output, text_output, alt_viz_output, drops_output, total_chunks_output, chunk], | |
| ) | |
| def precompute_next_chunk(json_input, current_chunk): | |
| return precompute_chunk(json_input, 100, current_chunk) | |
| def update_chunk(json_input, current_chunk, action, precomputed_next=None): | |
| total_chunks = visualize_logprobs(json_input, 0)[5] # Get total chunks | |
| if action == "prev" and current_chunk > 0: | |
| current_chunk -= 1 | |
| elif action == "next" and current_chunk < total_chunks - 1: | |
| current_chunk += 1 | |
| if precomputed_next and all(precomputed_next): | |
| logger.debug("Using precomputed next chunk for chunk %d", current_chunk) | |
| return visualize_logprobs(json_input, current_chunk) | |
| return visualize_logprobs(json_input, current_chunk) | |
| prev_btn.click( | |
| fn=update_chunk, | |
| inputs=[json_input_viz, chunk, gr.State(value="prev"), precomputed_next], | |
| outputs=[plot_output, table_output, text_output, alt_viz_output, drops_output, total_chunks_output, chunk], | |
| ) | |
| next_btn.click( | |
| fn=update_chunk, | |
| inputs=[json_input_viz, chunk, gr.State(value="next"), precomputed_next], | |
| outputs=[plot_output, table_output, text_output, alt_viz_output, drops_output, total_chunks_output, chunk], | |
| ) | |
| def trigger_precomputation(json_input, current_chunk): | |
| try: | |
| precomputed = precompute_next_chunk(json_input, current_chunk) | |
| precomputed_next.value = precomputed # Update state directly | |
| except Exception as e: | |
| logger.error("Precomputation trigger failed: %s", str(e)) | |
| return gr.update(value=current_chunk) | |
| chunk.change( | |
| fn=trigger_precomputation, | |
| inputs=[json_input_viz, chunk], | |
| outputs=[chunk], | |
| ) | |
| # Launch the Gradio application | |
| app.launch() | |
| except Exception as e: | |
| logger.error("Application startup failed: %s", str(e)) | |
| raise |