ThreatLevelD
Upgrade EILProcessor to world-class signal normalization: adds subphrase/keyword blend detection, chunk weighting by model confidence, negation/contrast handling, emotion arc trajectory output, and sentiment-to-emotion mapping for non-EI language. Significantly improves long-form and ambiguous emotional inference.
9b2f3b7
| # core/eil_processor.py | |
| # MEC EIL Processor – World-Class Signal Normalization Edition | |
| import yaml | |
| import re | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch | |
| import torch.nn.functional as F | |
| class EILProcessor: | |
| def __init__(self, codex_informer, softmax_threshold=0.6): | |
| self.codex_informer = codex_informer | |
| self.softmax_threshold = softmax_threshold | |
| # Build alias lookup from Codex | |
| self.alias_lookup = self.codex_informer.build_alias_lookup() | |
| print(f"[EILProcessor] Alias map loaded with {len(self.alias_lookup)} entries") | |
| # Load crosswalk.yaml | |
| with open('config/crosswalk.yaml', 'r', encoding='utf-8') as f: | |
| yaml_data = yaml.safe_load(f) | |
| crosswalk_data = yaml_data['crosswalk'] | |
| story_pattern_data = yaml_data.get('story_patterns', []) | |
| # Build crosswalk lookup | |
| self.crosswalk_lookup = {} | |
| for entry in crosswalk_data: | |
| phrase = self.normalize_text(entry['phrase']) | |
| emotion_code = entry['emotion_code'] | |
| self.crosswalk_lookup[phrase] = emotion_code | |
| # Build story_patterns lookup | |
| self.story_patterns_lookup = {} | |
| for entry in story_pattern_data: | |
| pattern = self.normalize_text(entry['pattern']) | |
| emotion_code = entry['emotion_code'] | |
| self.story_patterns_lookup[pattern] = emotion_code | |
| print(f"[EILProcessor] Crosswalk loaded with {len(self.crosswalk_lookup)} entries") | |
| print(f"[EILProcessor] Story Patterns loaded with {len(self.story_patterns_lookup)} entries") | |
| # Emotion keyword dictionary for signal normalization/blending | |
| self.emotion_keyword_map = { | |
| "FAM-ANG": ["anger", "angry", "hate", "furious", "rage", "resentment"], | |
| "FAM-HEL": ["helpless", "powerless", "can't", "unable", "trapped", "stuck", "overwhelmed", "overwhelm"], | |
| "FAM-SAD": ["sad", "down", "unhappy", "miserable", "depressed", "blue", "empty"], | |
| "FAM-FEA": ["afraid", "scared", "fear", "terrified", "worried", "nervous", "anxious", "can't sleep"], | |
| "FAM-LOV": ["love", "loved", "loving", "caring", "affection", "proud"], | |
| "FAM-JOY": ["joy", "happy", "excited", "delighted", "content", "proud"], | |
| "FAM-SUR": ["surprised", "amazed", "astonished", "shocked"], | |
| "FAM-DIS": ["disgust", "disgusted", "gross", "revolted"], | |
| "FAM-SHA": ["ashamed", "shame", "embarrassed", "humiliated"], | |
| "FAM-GUI": ["guilty", "guilt", "remorse", "regret"], | |
| # Add more as needed | |
| } | |
| # For sentiment-to-emotion mapping of ambiguous/indirect language | |
| self.sentiment_cue_map = [ | |
| # (sentiment, regex or cue, mapped emotion) | |
| ("negative", r"can.?t sleep|insomnia|restless|wake up", "FAM-FEA"), | |
| ("negative", r"too much|overwhelmed|can.?t cope|can.?t deal", "FAM-HEL"), | |
| ("negative", r"nothing feels right|empty|pointless|no purpose", "FAM-SAD"), | |
| ("negative", r"don't care|apathy|numb", "FAM-LON"), | |
| ("positive", r"did it|proud|relieved", "FAM-JOY"), | |
| ("neutral", r"just tired|exhausted", "FAM-HEL"), | |
| # ...add more for coverage | |
| ] | |
| # Load emotion and sentiment models | |
| self.tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-emotion') | |
| self.model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-emotion') | |
| self.sentiment_tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest') | |
| self.sentiment_model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest') | |
| def normalize_text(self, text): | |
| normalization_map = { | |
| "i am feeling ": "", | |
| "i feel ": "", | |
| "feeling ": "", | |
| "i'm feeling ": "", | |
| "i am ": "", | |
| "i'm ": "" | |
| } | |
| text = text.lower().strip() | |
| for k, v in normalization_map.items(): | |
| if text.startswith(k): | |
| text = text.replace(k, "", 1) | |
| break | |
| text = re.sub(r'[.!?]', '', text) | |
| return text | |
| def is_story_input(self, text): | |
| clause_markers = [',', ';', '.', 'but', 'because', 'so that', 'which', 'when', 'while'] | |
| token_count = len(text.split()) | |
| clause_hits = any(marker in text for marker in clause_markers) | |
| return token_count > 12 or clause_hits | |
| def chunk_story(self, text): | |
| chunks = re.split(r'[.,;!?]|\b(?:and|but|because|so|although|though|while|when)\b', text, flags=re.IGNORECASE) | |
| chunks = [chunk.strip() for chunk in chunks if chunk and chunk.strip()] | |
| return chunks | |
| def detect_emotion_blend_with_negation(self, norm_text): | |
| blend = {} | |
| for fam, keywords in self.emotion_keyword_map.items(): | |
| for kw in keywords: | |
| negation_patterns = [ | |
| rf"not {kw}", rf"no longer {kw}", rf"never {kw}", | |
| rf"no {kw}", rf"\bwithout {kw}" | |
| ] | |
| if any(re.search(p, norm_text) for p in negation_patterns): | |
| continue | |
| if kw in norm_text: | |
| blend[fam] = blend.get(fam, 0) + 1.0 | |
| return blend | |
| def get_sentiment(self, norm_text): | |
| tokens = self.sentiment_tokenizer(norm_text, return_tensors='pt') | |
| with torch.no_grad(): | |
| logits = self.sentiment_model(**tokens).logits | |
| probs = F.softmax(logits, dim=-1).squeeze() | |
| top_prob, top_idx = torch.max(probs, dim=-1) | |
| sentiment_label = self.sentiment_model.config.id2label[top_idx.item()] | |
| return sentiment_label.lower(), top_prob.item() | |
| def infer_emotion(self, input_text): | |
| norm_text = self.normalize_text(input_text) | |
| # 1️⃣ Story Pattern Override | |
| if norm_text in self.story_patterns_lookup: | |
| primary_emotion_code = self.story_patterns_lookup[norm_text] | |
| emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
| print(f"[EILProcessor] Story Pattern match: '{norm_text}' → {primary_emotion_code}") | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}], | |
| 'metadata': {'source': 'EILProcessor (story pattern)', 'input_type': 'story'}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': {emotion_data['primary_emotion_code']: 1.0}, | |
| 'trajectory': [emotion_data['primary_emotion_code']], | |
| } | |
| return packet | |
| # 2️⃣ Story detection (chunking and blend aggregation) | |
| input_type = 'phrase' | |
| if self.is_story_input(norm_text): | |
| input_type = 'story' | |
| print(f"[EILProcessor] Story mode activated for input: '{norm_text}'") | |
| chunks = self.chunk_story(norm_text) | |
| chunk_results = [] | |
| blend_accum = {} | |
| trajectory = [] | |
| for chunk in chunks: | |
| sub_result = self.infer_emotion(chunk) # RECURSIVE CALL | |
| chunk_results.append(sub_result) | |
| # Accumulate blends (weighted by confidence if available) | |
| conf = sub_result.get('confidence', 1.0) | |
| for fam, val in sub_result.get('blend', {}).items(): | |
| blend_accum[fam] = blend_accum.get(fam, 0) + val * conf | |
| # Trajectory | |
| if 'primary_emotion_code' in sub_result: | |
| trajectory.append(sub_result['primary_emotion_code']) | |
| # Normalize blend | |
| if blend_accum: | |
| total = sum(blend_accum.values()) | |
| for k in blend_accum: | |
| blend_accum[k] /= total | |
| dominant_family = max(blend_accum.items(), key=lambda x: x[1])[0] | |
| else: | |
| dominant_family = "FAM-NEU" | |
| blend_accum = {"FAM-NEU": 1.0} | |
| trajectory = ["FAM-NEU"] | |
| emotion_data = self.codex_informer.resolve_emotion_family(dominant_family) | |
| packet = { | |
| 'phrases': [input_text] + [r['phrases'][0] for r in chunk_results], | |
| 'emotion_candidates': [{'phrase': r['phrases'][0], 'candidate_emotion': r.get('primary_emotion_code', 'FAM-NEU')} for r in chunk_results], | |
| 'metadata': {'source': 'EILProcessor (story mode)', 'input_type': input_type}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': blend_accum, | |
| 'trajectory': trajectory, | |
| } | |
| return packet | |
| # 3️⃣ Crosswalk check | |
| if norm_text in self.crosswalk_lookup: | |
| primary_emotion_code = self.crosswalk_lookup[norm_text] | |
| emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
| print(f"[EILProcessor] Crosswalk match: '{norm_text}' → {primary_emotion_code}") | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}], | |
| 'metadata': {'source': 'EILProcessor (crosswalk)', 'input_type': input_type}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': {emotion_data['primary_emotion_code']: 1.0}, | |
| 'trajectory': [emotion_data['primary_emotion_code']], | |
| } | |
| return packet | |
| # 4️⃣ Alias lookup | |
| if norm_text in self.alias_lookup: | |
| variant_code = self.alias_lookup[norm_text] | |
| emotion_family = variant_code.split('-')[1] | |
| family_code = f"FAM-{emotion_family}" | |
| print(f"[EILProcessor] Alias match: '{norm_text}' → {variant_code}") | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': variant_code}], | |
| 'metadata': {'source': 'EILProcessor (alias match)', 'input_type': input_type}, | |
| 'emotion_family': family_code, | |
| 'primary_emotion_code': variant_code, | |
| 'arc': 'Pending', | |
| 'resonance': 'Pending', | |
| 'blend': {variant_code: 1.0}, | |
| 'trajectory': [variant_code], | |
| } | |
| return packet | |
| # 5️⃣ Signal normalization - blend detection & negation | |
| blend = self.detect_emotion_blend_with_negation(norm_text) | |
| if blend: | |
| total = sum(blend.values()) | |
| for k in blend: | |
| blend[k] /= total | |
| primary_code = max(blend.items(), key=lambda x: x[1])[0] | |
| emotion_data = self.codex_informer.resolve_emotion_family(primary_code) | |
| print(f"[EILProcessor] Signal normalization keyword blend: {blend} (primary: {primary_code})") | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_code}], | |
| 'metadata': {'source': 'EILProcessor (signal normalization)', 'input_type': input_type}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': blend, | |
| 'trajectory': [primary_code], | |
| } | |
| return packet | |
| # 6️⃣ Sentiment-to-emotion mapping for non-EI language | |
| sentiment, sentiment_conf = self.get_sentiment(norm_text) | |
| print(f"[EILProcessor] Sentiment fallback: {sentiment} ({sentiment_conf:.2f})") | |
| for sent, cue, fam in self.sentiment_cue_map: | |
| if sent == sentiment and re.search(cue, norm_text): | |
| emotion_data = self.codex_informer.resolve_emotion_family(fam) | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': fam}], | |
| 'metadata': {'source': 'EILProcessor (sentiment-to-emotion)', 'input_type': input_type}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': {fam: 1.0}, | |
| 'trajectory': [fam], | |
| } | |
| return packet | |
| # 7️⃣ Model fallback (last resort) | |
| print(f"[EILProcessor] No crosswalk/alias/keyword/sentiment match — running model on: '{norm_text}'") | |
| tokens = self.tokenizer(norm_text, return_tensors='pt') | |
| with torch.no_grad(): | |
| logits = self.model(**tokens).logits | |
| probs = F.softmax(logits, dim=-1).squeeze() | |
| top_prob, top_idx = torch.max(probs, dim=-1) | |
| predicted_label = self.model.config.id2label[top_idx.item()] | |
| confidence = top_prob.item() | |
| if confidence < self.softmax_threshold: | |
| predicted_label = 'neutral' | |
| print(f"[EILProcessor] Low confidence ({confidence:.2f}) — setting to 'neutral'") | |
| print(f"[EILProcessor] Model prediction: {predicted_label} ({confidence:.2f})") | |
| model_to_codex_map = { | |
| "joy": "FAM-JOY", | |
| "anger": "FAM-ANG", | |
| "sadness": "FAM-SAD", | |
| "fear": "FAM-FEA", | |
| "love": "FAM-LOV", | |
| "surprise": "FAM-SUR", | |
| "disgust": "FAM-DIS", | |
| "neutral": "FAM-NEU" | |
| } | |
| primary_emotion_code = model_to_codex_map.get(predicted_label.lower(), "FAM-NEU") | |
| emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
| blend = {emotion_data['primary_emotion_code']: 1.0} | |
| packet = { | |
| 'phrases': [input_text], | |
| 'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': predicted_label}], | |
| 'metadata': {'source': 'EILProcessor (model)', 'input_type': input_type, 'confidence': confidence}, | |
| 'emotion_family': emotion_data['emotion_family'], | |
| 'primary_emotion_code': emotion_data['primary_emotion_code'], | |
| 'arc': emotion_data['arc'], | |
| 'resonance': emotion_data['resonance'], | |
| 'blend': blend, | |
| 'trajectory': [emotion_data['primary_emotion_code']], | |
| 'confidence': confidence | |
| } | |
| return packet | |