Spaces:
Running
on
Zero
Running
on
Zero
| import re | |
| import html | |
| import json | |
| from typing import Dict, List, Tuple, Optional, Any, Union | |
| class ContextProcessor: | |
| """Processes highlighted contexts for the RAG Summarizer Arena""" | |
| # Common HTML entities that might be incomplete | |
| INCOMPLETE_ENTITIES = { | |
| ''': ''', | |
| '"': '"', | |
| '<': '<', | |
| '>': '>', | |
| '&': '&' | |
| } | |
| def clean_text(text: str) -> str: | |
| """Cleans text by fixing HTML entities and handling escaped characters""" | |
| if not text or not isinstance(text, str): | |
| return text | |
| # Fix incomplete HTML entities | |
| for incomplete, complete in ContextProcessor.INCOMPLETE_ENTITIES.items(): | |
| text = re.sub(f"{re.escape(incomplete)}(?!;)", complete, text) | |
| # Convert HTML entities to characters | |
| try: | |
| text = html.unescape(text) | |
| except Exception: | |
| pass | |
| # Handle escaped quotes and special characters | |
| replacements = { | |
| r'\"': '"', r"\'": "'", r"\n": "\n", r"\t": "\t", r"\\": "\\", | |
| '"': '"', '"': '"', ''': "'", ''': "'", '`': "'", '´': "'" | |
| } | |
| for pattern, replacement in replacements.items(): | |
| text = text.replace(pattern, replacement) | |
| # Remove trailing backslash if present | |
| if text.rstrip().endswith('\\'): | |
| text = text.rstrip().rstrip('\\') | |
| return text | |
| def balance_highlight_tags(text: str) -> str: | |
| """Ensures highlight tags are properly balanced""" | |
| if not text or not isinstance(text, str): | |
| return text | |
| # Define highlight tag patterns | |
| highlight_pairs = [ | |
| ('[[start_highlight]]', '[[end_highlight]]'), | |
| ('[[highlight_start]]', '[[highlight_end]]'), | |
| ('<span class="highlight">', '</span>') | |
| ] | |
| # Check and balance each pair | |
| for start_tag, end_tag in highlight_pairs: | |
| start_count = text.count(start_tag) | |
| end_count = text.count(end_tag) | |
| # Add missing tags if needed | |
| if start_count > end_count: | |
| text += end_tag * (start_count - end_count) | |
| elif end_count > start_count: | |
| text = start_tag * (end_count - start_count) + text | |
| return text | |
| def balance_quotes(text: str) -> str: | |
| """Ensures quotes are properly balanced""" | |
| if not text or not isinstance(text, str): | |
| return text | |
| # First, remove escaped quotes from the count | |
| plain_text = text.replace('\\"', '') | |
| # Count quotes and balance if needed | |
| quote_count = plain_text.count('"') | |
| if quote_count % 2 == 1: | |
| text += '"' | |
| return text | |
| def extract_highlight_parts(text: str) -> List[Tuple[bool, str]]: | |
| """ | |
| Extracts highlighted and non-highlighted parts from text, preserving order | |
| """ | |
| # Ensure highlight tags are balanced | |
| text = ContextProcessor.balance_highlight_tags(text) | |
| # Define all highlight patterns | |
| highlight_patterns = [ | |
| ('[[start_highlight]]', '[[end_highlight]]'), | |
| ('[[highlight_start]]', '[[highlight_end]]'), | |
| ('<span class="highlight">', '</span>') | |
| ] | |
| # Collect all highlight sections with their positions | |
| all_highlights = [] | |
| for start_tag, end_tag in highlight_patterns: | |
| # Escape special regex characters if needed | |
| start_esc = re.escape(start_tag) | |
| end_esc = re.escape(end_tag) | |
| # Find all occurrences of this highlight pattern | |
| for match in re.finditer(f"{start_esc}(.*?){end_esc}", text, re.DOTALL): | |
| all_highlights.append({ | |
| 'start': match.start(), | |
| 'end': match.end(), | |
| 'content': match.group(1), | |
| 'start_tag': start_tag, | |
| 'end_tag': end_tag | |
| }) | |
| # If no highlights found, return the whole text as unhighlighted | |
| if not all_highlights: | |
| return [(False, text)] | |
| # Sort highlights by start position | |
| all_highlights.sort(key=lambda x: x['start']) | |
| # Build the parts list by processing text portions between and including highlights | |
| parts = [] | |
| current_pos = 0 | |
| for highlight in all_highlights: | |
| # Add non-highlighted text before this highlight | |
| if highlight['start'] > current_pos: | |
| parts.append((False, text[current_pos:highlight['start']])) | |
| # Add the highlighted text | |
| parts.append((True, highlight['content'])) | |
| # Update position to end of this highlight | |
| current_pos = highlight['end'] | |
| # Add any remaining text after the last highlight | |
| if current_pos < len(text): | |
| parts.append((False, text[current_pos:])) | |
| return parts | |
| def is_markdown_table(text: str) -> bool: | |
| """Checks if text looks like a markdown table""" | |
| if not text or not isinstance(text, str): | |
| return False | |
| if '|' in text and '\n' in text: | |
| lines = text.strip().split('\n') | |
| pipe_lines = sum(1 for line in lines if line.strip().startswith('|')) | |
| return pipe_lines >= 2 | |
| return False | |
| def process_cell_content(cell_text: str) -> str: | |
| """Processes a single table cell, handling highlights if present""" | |
| # Clean and prepare the text | |
| cell_text = ContextProcessor.clean_text(cell_text) | |
| cell_text = ContextProcessor.balance_quotes(cell_text) | |
| # Check if cell has any highlight tags | |
| has_highlights = False | |
| highlight_patterns = [ | |
| '[[start_highlight]]', '[[end_highlight]]', | |
| '[[highlight_start]]', '[[highlight_end]]', | |
| '<span class="highlight">', '</span>' | |
| ] | |
| for pattern in highlight_patterns: | |
| if pattern in cell_text: | |
| has_highlights = True | |
| break | |
| if has_highlights: | |
| # Extract and process highlight parts | |
| parts = ContextProcessor.extract_highlight_parts(cell_text) | |
| # Build the result | |
| result = "" | |
| for is_highlighted, part in parts: | |
| if is_highlighted: | |
| result += f'<span class="highlight">{html.escape(part)}</span>' | |
| else: | |
| result += html.escape(part) | |
| return result | |
| else: | |
| # Just escape HTML in regular cells | |
| return html.escape(cell_text) | |
| def convert_table_to_html(text: str) -> str: | |
| """Converts markdown table to HTML with support for highlights in cells""" | |
| # Clean the text | |
| text = ContextProcessor.clean_text(text) | |
| # Split into lines and get table rows | |
| lines = text.strip().split('\n') | |
| table_lines = [line for line in lines if line.strip().startswith('|')] | |
| # Check if it's a proper table | |
| if len(table_lines) < 2: | |
| return ContextProcessor.process_text(text) | |
| # Check if second line is a separator (----) | |
| has_header = False | |
| if len(table_lines) >= 2 and '---' in table_lines[1]: | |
| has_header = True | |
| # Start building HTML table | |
| html_output = '<table class="md-table">' | |
| if has_header: | |
| # Process header row | |
| header_line = table_lines[0] | |
| # Split by pipe and remove empty first and last elements | |
| cells = [cell.strip() for cell in header_line.split('|')] | |
| if cells and not cells[0]: | |
| cells.pop(0) | |
| if cells and not cells[-1]: | |
| cells.pop() | |
| html_output += '<thead><tr>' | |
| for cell in cells: | |
| cell_html = ContextProcessor.process_cell_content(cell) | |
| html_output += f'<th>{cell_html}</th>' | |
| html_output += '</tr></thead>' | |
| # Process data rows (skip header and separator) | |
| html_output += '<tbody>' | |
| for line in table_lines[2:]: | |
| cells = [cell.strip() for cell in line.split('|')] | |
| if cells and not cells[0]: | |
| cells.pop(0) | |
| if cells and not cells[-1]: | |
| cells.pop() | |
| html_output += '<tr>' | |
| for cell in cells: | |
| cell_html = ContextProcessor.process_cell_content(cell) | |
| html_output += f'<td>{cell_html}</td>' | |
| html_output += '</tr>' | |
| html_output += '</tbody>' | |
| else: | |
| # All rows are data | |
| html_output += '<tbody>' | |
| for line in table_lines: | |
| cells = [cell.strip() for cell in line.split('|')] | |
| if cells and not cells[0]: | |
| cells.pop(0) | |
| if cells and not cells[-1]: | |
| cells.pop() | |
| html_output += '<tr>' | |
| for cell in cells: | |
| cell_html = ContextProcessor.process_cell_content(cell) | |
| html_output += f'<td>{cell_html}</td>' | |
| html_output += '</tr>' | |
| html_output += '</tbody>' | |
| html_output += '</table>' | |
| return html_output | |
| def process_text(text: str) -> str: | |
| """Processes text with highlights, handling all edge cases""" | |
| # Clean and prepare the text | |
| text = ContextProcessor.clean_text(text) | |
| text = ContextProcessor.balance_quotes(text) | |
| text = ContextProcessor.balance_highlight_tags(text) | |
| # Extract and process highlight parts | |
| parts = ContextProcessor.extract_highlight_parts(text) | |
| # Build the result | |
| result = "" | |
| for is_highlighted, part in parts: | |
| if is_highlighted: | |
| escaped_part = html.escape(part) | |
| result += f'<span class="highlight">{escaped_part}</span>' | |
| else: | |
| result += html.escape(part) | |
| return result | |
| def process_content(content: str, abbreviated_content: Optional[str] = None) -> str: | |
| """Main function to process any kind of content""" | |
| # Handle null/empty content | |
| if not content or not isinstance(content, str): | |
| return "" | |
| # Special cases that need abbreviated content | |
| special_cases = [ | |
| lambda c: c.strip() == "In Oklahoma,", | |
| lambda c: c.strip().startswith('"') and c.count('"') == 1, | |
| lambda c: c.rstrip().endswith('\\'), | |
| lambda c: (c.replace('\\"', '').count('"') % 2) == 1, | |
| lambda c: any((c.count(start) != c.count(end)) for start, end in [ | |
| ('[[start_highlight]]', '[[end_highlight]]'), | |
| ('[[highlight_start]]', '[[highlight_end]]'), | |
| ('<span class="highlight">', '</span>') | |
| ]) | |
| ] | |
| # Check if we need to use abbreviated content | |
| needs_abbreviated = any(check(content) for check in special_cases) | |
| # If content needs help and we have abbreviated content, use it | |
| if needs_abbreviated and abbreviated_content: | |
| # Handle abbreviated content that might be a JSON string | |
| if abbreviated_content.strip().startswith('{') and abbreviated_content.strip().endswith('}'): | |
| try: | |
| data = json.loads(abbreviated_content) | |
| if "abbreviatedContent" in data: | |
| abbreviated_content = data["abbreviatedContent"] | |
| except json.JSONDecodeError: | |
| pass | |
| # Clean and prepare the abbreviated content | |
| abbreviated_content = ContextProcessor.clean_text(abbreviated_content) | |
| abbreviated_content = ContextProcessor.balance_quotes(abbreviated_content) | |
| abbreviated_content = ContextProcessor.balance_highlight_tags(abbreviated_content) | |
| # Use abbreviated content instead | |
| content = abbreviated_content | |
| # Check if content is a markdown table | |
| if ContextProcessor.is_markdown_table(content): | |
| return ContextProcessor.convert_table_to_html(content) | |
| else: | |
| return ContextProcessor.process_text(content) | |
| def parse_json_contexts(context_json: str) -> List[Dict[str, Any]]: | |
| """Parses JSON-formatted context data with fallback to regex extraction""" | |
| contexts = [] | |
| # First try standard JSON parsing | |
| try: | |
| contexts = json.loads(context_json) | |
| if not isinstance(contexts, list): | |
| contexts = [] | |
| except json.JSONDecodeError: | |
| # If standard parsing fails, use regex to extract the data | |
| try: | |
| # Extract type field | |
| type_pattern = r'"type":\s*"(primary|secondary)"' | |
| types = re.findall(type_pattern, context_json) | |
| # Extract abbreviatedContent field - more robustly handle quotes | |
| content_pattern = r'"abbreviatedContent":\s*"((?:\\.|[^"])*?)"' | |
| contents = re.findall(content_pattern, context_json) | |
| # Build context objects | |
| for i, (ctx_type, content) in enumerate(zip(types, contents)): | |
| contexts.append({ | |
| 'type': ctx_type, | |
| 'abbreviatedContent': content.replace('\\"', '"') | |
| }) | |
| except Exception as e: | |
| print(f"Error extracting contexts with regex: {e}") | |
| return contexts | |
| def process_json_contexts(context_json: str) -> List[Dict[str, Any]]: | |
| """Process JSON-formatted highlighted contexts""" | |
| processed_contexts = [] | |
| try: | |
| # Parse the JSON contexts | |
| contexts = ContextProcessor.parse_json_contexts(context_json) | |
| # Process each context item | |
| for i, item in enumerate(contexts): | |
| if isinstance(item, dict): | |
| context_type = item.get('type', 'secondary') | |
| content = item.get('abbreviatedContent', '') | |
| # Process the content | |
| processed_content = ContextProcessor.process_content(content) | |
| # Create processed context item | |
| processed_contexts.append({ | |
| 'chunk_num': i + 1, | |
| 'content': processed_content, | |
| 'is_primary': context_type == 'primary' | |
| }) | |
| except Exception as e: | |
| print(f"Error processing JSON contexts: {e}") | |
| return processed_contexts | |
| # Module-level functions for backward compatibility | |
| def clean_text(text): | |
| return ContextProcessor.clean_text(text) | |
| def balance_highlight_tags(text): | |
| return ContextProcessor.balance_highlight_tags(text) | |
| def balance_quotes(text): | |
| return ContextProcessor.balance_quotes(text) | |
| def extract_highlight_parts(text): | |
| return ContextProcessor.extract_highlight_parts(text) | |
| def is_markdown_table(text): | |
| return ContextProcessor.is_markdown_table(text) | |
| def process_cell_content(cell_text): | |
| return ContextProcessor.process_cell_content(cell_text) | |
| def convert_table_to_html(text): | |
| return ContextProcessor.convert_table_to_html(text) | |
| def process_text(text): | |
| return ContextProcessor.process_text(text) | |
| def process_content(content, abbreviated_content=None): | |
| return ContextProcessor.process_content(content, abbreviated_content) | |
| def process_highlights(text): | |
| """Main entry point called from data_loader.py""" | |
| return ContextProcessor.process_content(text) | |
| def get_context_html(example, show_full=False): | |
| """Format context chunks into HTML for display""" | |
| html_output = "" | |
| # Process insufficient context warning if needed | |
| if example.get("insufficient", False): | |
| insufficient_reason = example.get("insufficient_reason", "") | |
| reason_html = ( | |
| f"<p>{insufficient_reason}</p>" if insufficient_reason else | |
| "<p>The context may not contain enough information to fully answer the question, " | |
| "or the question might be ambiguous. Models should ideally indicate this limitation " | |
| "or refuse to answer.</p>" | |
| ) | |
| html_output += f""" | |
| <div class="insufficient-alert"> | |
| <strong> | |
| <svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" viewBox="0 0 24 24" fill="none" | |
| stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" | |
| style="vertical-align: middle; margin-right: 5px;"> | |
| <path d="m21.73 18-8-14a2 2 0 0 0-3.48 0l-8 14A2 2 0 0 0 4 21h16a2 2 0 0 0 1.73-3Z"></path> | |
| <line x1="12" y1="9" x2="12" y2="13"></line> | |
| <line x1="12" y1="17" x2="12.01" y2="17"></line> | |
| </svg> | |
| Insufficient Context | |
| </strong> | |
| {reason_html} | |
| </div> | |
| """ | |
| html_output += '<div class="context-items-container">' | |
| # Display full contexts if requested | |
| if show_full and "full_contexts" in example and example["full_contexts"]: | |
| for context_item in example["full_contexts"]: | |
| content = context_item.get('content', '') | |
| abbreviated = context_item.get('abbreviatedContent', None) | |
| # Process the content | |
| processed = ContextProcessor.process_content(content, abbreviated) | |
| html_output += f'<div class="context-item">{processed}</div>' | |
| else: | |
| # Display regular contexts if available | |
| if "contexts" in example and example["contexts"]: | |
| for context_item in example["contexts"]: | |
| content = context_item.get('content', '') | |
| abbreviated = context_item.get('abbreviatedContent', None) | |
| # Process the content | |
| processed = ContextProcessor.process_content(content, abbreviated) | |
| is_primary = context_item.get('is_primary', False) | |
| extra_class = " primary-context" if is_primary else "" | |
| html_output += f'<div class="context-item{extra_class}">{processed}</div>' | |
| # Or process JSON-structured highlighted contexts | |
| elif "contexts_highlighted" in example and example["contexts_highlighted"]: | |
| processed_contexts = ContextProcessor.process_json_contexts(example["contexts_highlighted"]) | |
| for context_item in processed_contexts: | |
| is_primary = context_item.get('is_primary', False) | |
| extra_class = " primary-context" if is_primary else "" | |
| html_output += f'<div class="context-item{extra_class}">{context_item["content"]}</div>' | |
| else: | |
| html_output += '<div class="context-item">No context available. Try toggling to full context view.</div>' | |
| html_output += '</div>' | |
| return html_output |