Spaces:
Running
Running
| import os | |
| import re | |
| import torch | |
| import logging | |
| import gc | |
| import sys | |
| import pwd # Added for monkey patch | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Dict, List, Optional | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from tokenizers.normalizers import Sequence, Replace, Strip | |
| from tokenizers import Regex | |
| from huggingface_hub import hf_hub_download # Added for reliable HF downloads | |
| # ===================================================== | |
| # 🛠️ Monkey Patch for Docker/Container UID Issue | |
| # ===================================================== | |
| # Fix for 'getpwuid(): uid not found: 1000' in containerized environments | |
| def patched_getpwuid(uid_num): | |
| try: | |
| return original_getpwuid(uid_num) | |
| except KeyError: | |
| if uid_num == os.getuid(): | |
| # Create fake user entry | |
| return pwd.struct_pwent( | |
| name='dockeruser', | |
| passwd='x', | |
| uid=uid_num, | |
| gid=os.getgid(), | |
| gecos='Docker User', | |
| dir='/tmp', | |
| shell='/bin/sh' | |
| ) | |
| raise | |
| original_getpwuid = pwd.getpwuid | |
| pwd.getpwuid = patched_getpwuid | |
| # Set fallback env vars to avoid user-dependent paths | |
| os.environ.setdefault('HOME', '/tmp') | |
| os.environ.setdefault('USER', 'dockeruser') | |
| # ===================================================== | |
| # 🔧 تكوين البيئة والإعدادات | |
| # ===================================================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # إعدادات الذاكرة والكاش | |
| CACHE_DIR = "/tmp/huggingface_cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # تكوين متغيرات البيئة لـ Hugging Face | |
| os.environ.update({ | |
| "HF_HOME": CACHE_DIR, | |
| "TRANSFORMERS_CACHE": CACHE_DIR, | |
| "HF_DATASETS_CACHE": CACHE_DIR, | |
| "HUGGINGFACE_HUB_CACHE": CACHE_DIR, | |
| "TORCH_HOME": CACHE_DIR, | |
| "TOKENIZERS_PARALLELISM": "false", # منع مشاكل threading | |
| "TRANSFORMERS_OFFLINE": "0", # السماح بالتحميل من الإنترنت | |
| }) | |
| # إعدادات PyTorch للذاكرة | |
| if torch.cuda.is_available(): | |
| os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128' | |
| torch.backends.cudnn.benchmark = True | |
| # ===================================================== | |
| # 🚀 تحديد الجهاز (GPU أو CPU) | |
| # ===================================================== | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"🖥️ Using device: {device}") | |
| if torch.cuda.is_available(): | |
| logger.info(f"🎮 CUDA Device: {torch.cuda.get_device_name(0)}") | |
| logger.info(f"💾 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") | |
| # ===================================================== | |
| # 📊 خريطة الموديلات | |
| # ===================================================== | |
| label_mapping = { | |
| 0: '13B', 1: '30B', 2: '65B', 3: '7B', 4: 'GLM130B', 5: 'bloom_7b', | |
| 6: 'bloomz', 7: 'cohere', 8: 'davinci', 9: 'dolly', 10: 'dolly-v2-12b', | |
| 11: 'flan_t5_base', 12: 'flan_t5_large', 13: 'flan_t5_small', | |
| 14: 'flan_t5_xl', 15: 'flan_t5_xxl', 16: 'gemma-7b-it', 17: 'gemma2-9b-it', | |
| 18: 'gpt-3.5-turbo', 19: 'gpt-35', 20: 'gpt4', 21: 'gpt4o', | |
| 22: 'gpt_j', 23: 'gpt_neox', 24: 'human', 25: 'llama3-70b', 26: 'llama3-8b', | |
| 27: 'mixtral-8x7b', 28: 'opt_1.3b', 29: 'opt_125m', 30: 'opt_13b', | |
| 31: 'opt_2.7b', 32: 'opt_30b', 33: 'opt_350m', 34: 'opt_6.7b', | |
| 35: 'opt_iml_30b', 36: 'opt_iml_max_1.3b', 37: 't0_11b', 38: 't0_3b', | |
| 39: 'text-davinci-002', 40: 'text-davinci-003' | |
| } | |
| # ===================================================== | |
| # 🤖 Model Manager - إدارة الموديلات | |
| # ===================================================== | |
| class ModelManager: | |
| def __init__(self): | |
| self.tokenizer = None | |
| self.models = [] | |
| self.models_loaded = False | |
| self.model_urls = [ | |
| "https://huggingface.co/mihalykiss/modernbert_2/resolve/main/Model_groups_3class_seed12", | |
| "https://huggingface.co/mihalykiss/modernbert_2/resolve/main/Model_groups_3class_seed22" | |
| ] | |
| self.base_model_id = "answerdotai/ModernBERT-base" # Primary | |
| self.fallback_model_id = "bert-base-uncased" # Fallback if ModernBERT fails | |
| self.using_fallback = False | |
| def load_tokenizer(self): | |
| """تحميل الـ Tokenizer مع fallback""" | |
| try: | |
| logger.info(f"📝 Loading tokenizer from {self.base_model_id}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.base_model_id, | |
| cache_dir=CACHE_DIR, | |
| use_fast=True, | |
| trust_remote_code=False | |
| ) | |
| logger.info("✅ Primary tokenizer loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to load primary tokenizer: {e}") | |
| try: | |
| logger.info(f"🔄 Falling back to {self.fallback_model_id}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.fallback_model_id, | |
| cache_dir=CACHE_DIR, | |
| use_fast=True, | |
| trust_remote_code=False | |
| ) | |
| self.using_fallback = True | |
| logger.info("✅ Fallback tokenizer loaded successfully") | |
| except Exception as fallback_e: | |
| logger.error(f"❌ Failed to load fallback tokenizer: {fallback_e}") | |
| return False | |
| # إعداد معالج النصوص | |
| try: | |
| newline_to_space = Replace(Regex(r'\s*\n\s*'), " ") | |
| join_hyphen_break = Replace(Regex(r'(\w+)[--]\s*\n\s*(\w+)'), r"\1\2") | |
| self.tokenizer.backend_tokenizer.normalizer = Sequence([ | |
| self.tokenizer.backend_tokenizer.normalizer, | |
| join_hyphen_break, | |
| newline_to_space, | |
| Strip() | |
| ]) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Could not set custom normalizer: {e}") | |
| return True | |
| def load_single_model(self, model_url=None, model_path=None, model_name="Model"): | |
| """تحميل موديل واحد مع fallback ومعالجة شاملة للأخطاء""" | |
| base_model = None | |
| try: | |
| logger.info(f"🤖 Loading base {model_name} from {self.base_model_id}...") | |
| # محاولة تحميل الموديل الأساسي الرئيسي | |
| base_model = AutoModelForSequenceClassification.from_pretrained( | |
| self.base_model_id, | |
| num_labels=41, | |
| cache_dir=CACHE_DIR, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=False | |
| ) | |
| logger.info("✅ Primary base model loaded") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to load primary base model: {e}") | |
| try: | |
| logger.info(f"🔄 Falling back to {self.fallback_model_id}...") | |
| base_model = AutoModelForSequenceClassification.from_pretrained( | |
| self.fallback_model_id, | |
| num_labels=41, | |
| cache_dir=CACHE_DIR, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=False | |
| ) | |
| self.using_fallback = True | |
| logger.info("✅ Fallback base model loaded (note: weights may not be compatible)") | |
| except Exception as fallback_e: | |
| logger.error(f"❌ Failed to load fallback base model: {fallback_e}") | |
| return None | |
| # محاولة تحميل الأوزان (فقط إذا لم نستخدم fallback، أو إذا كانت متوافقة) | |
| try: | |
| if model_path and os.path.exists(model_path): | |
| logger.info(f"📁 Loading from local file: {model_path}") | |
| state_dict = torch.load(model_path, map_location=device, weights_only=True) | |
| base_model.load_state_dict(state_dict, strict=False) | |
| elif model_url: | |
| # استخدام hf_hub_download بدلاً من torch.hub للـ HF repos | |
| logger.info(f"🌐 Downloading weights from HF repo...") | |
| repo_id = "mihalykiss/modernbert_2" | |
| filename = model_url.split("/")[-1] | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| cache_dir=CACHE_DIR | |
| ) | |
| logger.info(f"✅ Downloaded to {local_path}") | |
| state_dict = torch.load(local_path, map_location=device, weights_only=True) | |
| base_model.load_state_dict(state_dict, strict=False) | |
| logger.info(f"✅ {model_name} weights loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Could not load custom weights for {model_name}: {e}") | |
| logger.info("📌 Using base model without fine-tuned weights") | |
| # نقل للجهاز وضبط الوضع | |
| try: | |
| base_model = base_model.to(device) | |
| base_model.eval() | |
| logger.info(f"✅ {model_name} moved to {device} and set to eval mode") | |
| return base_model | |
| except Exception as e: | |
| logger.error(f"❌ Failed to prepare {model_name}: {e}") | |
| return None | |
| def load_models(self): | |
| """تحميل جميع الموديلات""" | |
| if self.models_loaded: | |
| return True | |
| try: | |
| # تحميل tokenizer | |
| if not self.load_tokenizer(): | |
| return False | |
| # تحميل كل موديل | |
| for i, model_url in enumerate(self.model_urls): | |
| model = self.load_single_model( | |
| model_url=model_url, | |
| model_name=f"Model {i+1}" | |
| ) | |
| if model is None: | |
| logger.warning(f"⚠️ Failed to load model {i+1}") | |
| continue | |
| self.models.append(model) | |
| if len(self.models) == 0: | |
| logger.error("❌ No models loaded successfully") | |
| return False | |
| self.models_loaded = True | |
| logger.info(f"✅ Successfully loaded {len(self.models)} model(s)") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Model loading error: {e}", exc_info=True) | |
| return False | |
| def classify_text(self, text: str, max_length: int = 512) -> Dict: | |
| """تصنيف النص""" | |
| if not self.models_loaded or not self.tokenizer: | |
| raise RuntimeError("Models or tokenizer not loaded") | |
| try: | |
| # Tokenization | |
| inputs = self.tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=max_length, | |
| padding=True | |
| ).to(device) | |
| # التنبؤ باستخدام جميع الموديلات | |
| all_logits = [] | |
| with torch.no_grad(): | |
| for model in self.models: | |
| outputs = model(**inputs) | |
| all_logits.append(outputs.logits) | |
| # حساب المتوسط | |
| avg_logits = torch.mean(torch.stack(all_logits), dim=0) | |
| probabilities = torch.nn.functional.softmax(avg_logits, dim=-1) | |
| # الحصول على أعلى التنبؤات | |
| top_probs, top_indices = torch.topk(probabilities[0], k=5) | |
| # حساب احتمالات AI vs Human | |
| ai_prob = 1.0 - probabilities[0][24].item() # 24 = human | |
| human_prob = probabilities[0][24].item() | |
| # الموديل المتوقع | |
| predicted_idx = top_indices[0].item() | |
| predicted_model = label_mapping.get(predicted_idx, "unknown") | |
| # Top 5 predictions | |
| top_5 = [ | |
| { | |
| "model": label_mapping.get(idx.item(), "unknown"), | |
| "probability": prob.item() | |
| } | |
| for prob, idx in zip(top_probs, top_indices) | |
| ] | |
| return { | |
| "ai_percentage": round(ai_prob * 100, 2), | |
| "human_percentage": round(human_prob * 100, 2), | |
| "predicted_model": predicted_model, | |
| "top_5_predictions": top_5, | |
| "models_used": len(self.models), | |
| "using_fallback": self.using_fallback | |
| } | |
| except Exception as e: | |
| logger.error(f"Classification error: {e}", exc_info=True) | |
| raise | |
| # ===================================================== | |
| # 🆕 ADVANCED ACCURACY FEATURES | |
| # ===================================================== | |
| def calculate_perplexity_score(text: str) -> float: | |
| """ | |
| Calculate text perplexity (complexity/predictability) | |
| AI text tends to have lower perplexity (more predictable) | |
| Human text has higher perplexity (more varied/unpredictable) | |
| """ | |
| words = text.split() | |
| if len(words) < 10: | |
| return 0.0 | |
| # Calculate word length variance | |
| word_lengths = [len(w) for w in words] | |
| avg_length = sum(word_lengths) / len(word_lengths) | |
| variance = sum((l - avg_length) ** 2 for l in word_lengths) / len(word_lengths) | |
| # Calculate unique word ratio | |
| unique_ratio = len(set(words)) / len(words) | |
| # Combine metrics (normalized 0-1, higher = more human-like) | |
| perplexity = (variance / 20) * 0.5 + unique_ratio * 0.5 | |
| return min(max(perplexity, 0), 1) | |
| def analyze_sentence_structure(text: str) -> Dict: | |
| """ | |
| Analyze sentence patterns | |
| AI tends to have: | |
| - More uniform sentence lengths | |
| - Consistent punctuation patterns | |
| - Regular structure | |
| """ | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) < 2: | |
| return {"uniformity": 0.5, "variance": 0.5} | |
| # Sentence lengths | |
| lengths = [len(s.split()) for s in sentences] | |
| avg_length = sum(lengths) / len(lengths) | |
| # Calculate variance (low variance = more uniform = AI-like) | |
| variance = sum((l - avg_length) ** 2 for l in lengths) / len(lengths) | |
| uniformity = 1 / (1 + variance / 10) # Normalize | |
| return { | |
| "uniformity": round(uniformity, 3), | |
| "variance": round(variance, 2), | |
| "avg_sentence_length": round(avg_length, 1), | |
| "sentence_count": len(sentences) | |
| } | |
| def detect_repetition_patterns(text: str) -> Dict: | |
| """ | |
| Detect repetitive patterns common in AI text | |
| AI often repeats: | |
| - Similar phrases | |
| - Sentence structures | |
| - Transition words | |
| """ | |
| words = text.lower().split() | |
| # Check for bigram repetition | |
| bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)] | |
| bigram_repetition = 1 - (len(set(bigrams)) / len(bigrams)) if bigrams else 0 | |
| # Check for trigram repetition | |
| trigrams = [f"{words[i]} {words[i+1]} {words[i+2]}" for i in range(len(words)-2)] | |
| trigram_repetition = 1 - (len(set(trigrams)) / len(trigrams)) if trigrams else 0 | |
| # Common AI transition phrases | |
| ai_phrases = [ | |
| 'furthermore', 'moreover', 'additionally', 'consequently', | |
| 'in conclusion', 'to summarize', 'it is important to note', | |
| 'it should be noted', 'in other words', 'as a result' | |
| ] | |
| ai_phrase_count = sum(1 for phrase in ai_phrases if phrase in text.lower()) | |
| ai_phrase_density = ai_phrase_count / max(len(words) / 100, 1) # per 100 words | |
| return { | |
| "bigram_repetition": round(bigram_repetition, 3), | |
| "trigram_repetition": round(trigram_repetition, 3), | |
| "ai_phrase_density": round(ai_phrase_density, 2), | |
| "ai_phrase_count": ai_phrase_count | |
| } | |
| def analyze_vocabulary_richness(text: str) -> Dict: | |
| """ | |
| Analyze vocabulary complexity | |
| AI tends to: | |
| - Use more formal vocabulary | |
| - Less slang/informal words | |
| - More technical terms | |
| """ | |
| words = [w.lower() for w in re.findall(r'\b[a-z]+\b', text.lower())] | |
| if len(words) < 10: | |
| return {"richness": 0.5, "formality": 0.5} | |
| # Type-token ratio (vocabulary diversity) | |
| ttr = len(set(words)) / len(words) | |
| # Informal markers (human-like) | |
| informal_markers = [ | |
| 'lol', 'omg', 'btw', 'tbh', 'imo', 'gonna', 'wanna', 'gotta', | |
| 'yeah', 'nah', 'yep', 'nope', 'kinda', 'sorta', 'dunno' | |
| ] | |
| informal_count = sum(1 for marker in informal_markers if marker in words) | |
| # Formal markers (AI-like) | |
| formal_markers = [ | |
| 'furthermore', 'nevertheless', 'consequently', 'substantially', | |
| 'primarily', 'significantly', 'comprehensive', 'fundamental', | |
| 'demonstrate', 'facilitate', 'optimize', 'leverage' | |
| ] | |
| formal_count = sum(1 for marker in formal_markers if marker in words) | |
| # Formality score (0 = informal/human, 1 = formal/AI) | |
| formality = formal_count / max(formal_count + informal_count, 1) | |
| return { | |
| "type_token_ratio": round(ttr, 3), | |
| "informal_markers": informal_count, | |
| "formal_markers": formal_count, | |
| "formality_score": round(formality, 3), | |
| "unique_words": len(set(words)) | |
| } | |
| def detect_human_errors(text: str) -> Dict: | |
| """ | |
| Detect common human typing patterns | |
| Humans tend to have: | |
| - Typos and spelling errors | |
| - Inconsistent punctuation | |
| - Emotional expressions | |
| """ | |
| # Emotional markers (very human) | |
| emotions = ['!', '?', '!!', '???', '...', 'haha', 'lmao', 'wow'] | |
| emotion_count = sum(text.lower().count(e) for e in emotions) | |
| # Repeated punctuation (human typo pattern) | |
| repeated_punct = len(re.findall(r'([!?.])\1+', text)) | |
| # ALL CAPS words (emotional emphasis, human-like) | |
| caps_words = len(re.findall(r'\b[A-Z]{2,}\b', text)) | |
| # Inconsistent spacing (human error) | |
| spacing_issues = len(re.findall(r'\s{2,}|[a-z][A-Z]', text)) | |
| return { | |
| "emotion_markers": emotion_count, | |
| "repeated_punctuation": repeated_punct, | |
| "caps_emphasis": caps_words, | |
| "spacing_inconsistencies": spacing_issues, | |
| "human_error_score": round((emotion_count + repeated_punct + caps_words) / max(len(text.split()) / 50, 1), 2) | |
| } | |
| def calculate_burstiness(text: str) -> float: | |
| """ | |
| Calculate burstiness (variation in sentence/word patterns) | |
| AI: Low burstiness (consistent) | |
| Human: High burstiness (varied, unpredictable) | |
| """ | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) < 3: | |
| return 0.5 | |
| lengths = [len(s.split()) for s in sentences] | |
| # Calculate burstiness score | |
| mean_length = sum(lengths) / len(lengths) | |
| variance = sum((l - mean_length) ** 2 for l in lengths) / len(lengths) | |
| # Higher variance = more bursty = more human | |
| burstiness = min(variance / 50, 1.0) # Normalize | |
| return round(burstiness, 3) | |
| def advanced_linguistic_analysis(text: str) -> Dict: | |
| """ | |
| Comprehensive linguistic analysis combining all methods | |
| Returns a confidence boost/penalty based on linguistic features | |
| """ | |
| try: | |
| perplexity = calculate_perplexity_score(text) | |
| structure = analyze_sentence_structure(text) | |
| repetition = detect_repetition_patterns(text) | |
| vocabulary = analyze_vocabulary_richness(text) | |
| human_errors = detect_human_errors(text) | |
| burstiness = calculate_burstiness(text) | |
| # Calculate AI likelihood from linguistic features | |
| # Higher score = more AI-like | |
| ai_indicators = [ | |
| structure["uniformity"], # High uniformity = AI | |
| repetition["bigram_repetition"] * 2, # High repetition = AI | |
| repetition["ai_phrase_density"] / 5, # Many AI phrases = AI | |
| vocabulary["formality_score"], # High formality = AI | |
| (1 - burstiness), # Low burstiness = AI | |
| (1 - perplexity), # Low perplexity = AI | |
| ] | |
| # Calculate human likelihood from linguistic features | |
| human_indicators = [ | |
| human_errors["human_error_score"], # Errors = human | |
| vocabulary["informal_markers"] / 10, # Informal = human | |
| burstiness, # High burstiness = human | |
| perplexity, # High perplexity = human | |
| ] | |
| linguistic_ai_score = sum(ai_indicators) / len(ai_indicators) | |
| linguistic_human_score = sum(human_indicators) / len(human_indicators) | |
| # Normalize to 0-100 scale | |
| linguistic_ai_percentage = round(linguistic_ai_score * 100, 2) | |
| linguistic_human_percentage = round(linguistic_human_score * 100, 2) | |
| return { | |
| "linguistic_features": { | |
| "perplexity": perplexity, | |
| "sentence_structure": structure, | |
| "repetition_patterns": repetition, | |
| "vocabulary_analysis": vocabulary, | |
| "human_error_patterns": human_errors, | |
| "burstiness": burstiness | |
| }, | |
| "linguistic_ai_score": linguistic_ai_percentage, | |
| "linguistic_human_score": linguistic_human_percentage, | |
| "confidence_modifier": { | |
| "ai_indicators_strength": round(linguistic_ai_score, 3), | |
| "human_indicators_strength": round(linguistic_human_score, 3), | |
| "combined_confidence": round(abs(linguistic_ai_score - linguistic_human_score), 3) | |
| } | |
| } | |
| except Exception as e: | |
| logger.warning(f"Advanced linguistic analysis failed: {e}") | |
| return { | |
| "linguistic_features": {}, | |
| "linguistic_ai_score": 50, | |
| "linguistic_human_score": 50, | |
| "confidence_modifier": {"error": str(e)} | |
| } | |
| # ===================================================== | |
| # 🆕 ADVANCED ACCURACY FEATURES | |
| # ===================================================== | |
| def clean_content_for_analysis(text: str, min_line_length: int = 30) -> str: | |
| """ | |
| Clean content by removing short lines (headlines, etc.) | |
| Args: | |
| text: Original text | |
| min_line_length: Minimum character length for a line to be kept (default: 30) | |
| Returns: | |
| Cleaned text with only substantial content lines | |
| """ | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| # Keep lines that are longer than min_line_length | |
| if len(stripped) >= min_line_length: | |
| cleaned_lines.append(stripped) | |
| return ' '.join(cleaned_lines) | |
| def split_content_in_half(text: str) -> tuple: | |
| """ | |
| Split cleaned content into two halves | |
| Args: | |
| text: Cleaned text | |
| Returns: | |
| Tuple of (first_half, second_half) | |
| """ | |
| words = text.split() | |
| mid_point = len(words) // 2 | |
| first_half = ' '.join(words[:mid_point]) | |
| second_half = ' '.join(words[mid_point:]) | |
| return first_half, second_half | |
| def analyze_content_halves(model_manager, text: str, overall_result: Dict = None) -> Dict: | |
| """ | |
| Analyze text by splitting it into two halves after cleaning. | |
| Uses BOTH models for ensemble predictions on each half for improved accuracy | |
| PLUS advanced linguistic analysis for enhanced confidence. | |
| """ | |
| try: | |
| logger.info("🔬 Running advanced linguistic analysis...") | |
| linguistic_analysis = advanced_linguistic_analysis(text) | |
| cleaned_text = clean_content_for_analysis(text) | |
| if not cleaned_text or len(cleaned_text.split()) < 10: | |
| return { | |
| "halves_analysis_available": False, | |
| "reason": "Content too short after cleaning", | |
| "linguistic_analysis": linguistic_analysis | |
| } | |
| # Split text into halves | |
| first_half, second_half = split_content_in_half(cleaned_text) | |
| # Linguistic analysis for each half | |
| first_half_linguistic = advanced_linguistic_analysis(first_half) | |
| second_half_linguistic = advanced_linguistic_analysis(second_half) | |
| # Ensemble model predictions | |
| first_half_result = model_manager.classify_text(first_half) | |
| second_half_result = model_manager.classify_text(second_half) | |
| first_ai = first_half_result["ai_percentage"] | |
| second_ai = second_half_result["ai_percentage"] | |
| first_model = first_half_result["predicted_model"] | |
| second_model = second_half_result["predicted_model"] | |
| first_top5 = first_half_result.get("top_5_predictions", []) | |
| second_top5 = second_half_result.get("top_5_predictions", []) | |
| first_half_words = len(first_half.split()) | |
| second_half_words = len(second_half.split()) | |
| # Stats | |
| avg_halves_ai_score = (first_ai + second_ai) / 2 | |
| variance_between_halves = abs(first_ai - second_ai) | |
| overall_ai_prob = ( | |
| overall_result["ai_percentage"] / 100 | |
| if overall_result | |
| else avg_halves_ai_score / 100 | |
| ) | |
| models_agree = first_model == second_model | |
| models_used = first_half_result.get("models_used", 1) | |
| ensemble_confidence_boost = "High" if models_used > 1 else "Low" | |
| # Linguistic AI/Human scores | |
| ling_ai = linguistic_analysis.get("linguistic_ai_score", 50) | |
| ling_human = linguistic_analysis.get("linguistic_human_score", 50) | |
| # Some fallback linguistic details | |
| burstiness = linguistic_analysis.get("burstiness", 0.5) | |
| formality_score = linguistic_analysis.get("formality_score", 0.5) | |
| human_error_score = linguistic_analysis.get("human_error_score", 0.5) | |
| emotion_markers = linguistic_analysis.get("emotion_markers", 0) | |
| # Weighted average between model and linguistic results | |
| combined_avg_ai = (avg_halves_ai_score * 0.7) + (ling_ai * 0.3) | |
| model_ling_agreement = abs(avg_halves_ai_score - ling_ai) < 20 | |
| # ----- Final Decision Logic ----- | |
| verdict = "UNCERTAIN" | |
| confidence = "Low" | |
| accuracy_percentage = 60 | |
| reasoning = "" | |
| # HUMAN | |
| if first_ai < 50 and second_ai < 50 and second_model.lower() == "human": | |
| verdict = "HUMAN" | |
| if ling_human > ling_ai: | |
| confidence = "Very High" | |
| accuracy_percentage = 95 | |
| elif variance_between_halves < 15: | |
| confidence = "High" | |
| accuracy_percentage = 85 | |
| else: | |
| confidence = "Medium" | |
| accuracy_percentage = 75 | |
| reasoning = ( | |
| f"Both halves scored below 50% AI probability (First: {first_ai}%, Second: {second_ai}%). " | |
| f"Linguistic analysis confirms with {ling_human:.1f}% human indicators. " | |
| f"The text shows {emotion_markers} emotional markers and a human error score of {human_error_score:.2f}. " | |
| f"Variance between halves is {variance_between_halves:.2f}%, indicating consistent human patterns. " | |
| ) | |
| # AI | |
| elif first_ai > 50 and second_ai > 50 and second_model.lower() != "human": | |
| verdict = "AI" | |
| if first_ai > 80 and second_ai > 80 and model_ling_agreement: | |
| confidence = "Very High" | |
| accuracy_percentage = 98 | |
| elif first_ai > 70 and second_ai > 70: | |
| confidence = "High" | |
| accuracy_percentage = 90 | |
| else: | |
| confidence = "Medium" | |
| accuracy_percentage = 80 | |
| reasoning = ( | |
| f"Both halves scored above 50% AI probability (First: {first_ai}%, Second: {second_ai}%). " | |
| f"Linguistic analysis confirms with {ling_ai:.1f}% AI indicators. " | |
| f"Detected high formality score ({formality_score:.2f}) and low burstiness ({burstiness:.2f}), typical of AI generation. " | |
| f"Variance between halves: {variance_between_halves:.2f}%. " | |
| f"Models {'agree' if models_agree else 'disagree'} across halves." | |
| ) | |
| # MIXED | |
| elif (first_ai > 50 and second_ai < 50) or (first_ai < 50 and second_ai > 50): | |
| verdict = "MIXED" | |
| confidence = "Medium" if variance_between_halves > 30 else "Low" | |
| accuracy_percentage = 75 | |
| reasoning = ( | |
| f"Mixed signals detected. First half: {first_ai}% AI ({first_model}), " | |
| f"Second half: {second_ai}% AI ({second_model}). " | |
| f"Linguistic AI score: {ling_ai:.1f}%. " | |
| f"Variance between halves ({variance_between_halves:.2f}%) supports mixed authorship." | |
| ) | |
| # Borderline | |
| else: | |
| if second_model.lower() == "human" or ling_human > ling_ai: | |
| verdict = "LIKELY_HUMAN" | |
| confidence = "Medium" | |
| accuracy_percentage = 70 | |
| else: | |
| verdict = "LIKELY_AI" | |
| confidence = "Medium" | |
| accuracy_percentage = 70 | |
| reasoning = ( | |
| f"Borderline case: scores near 50%. " | |
| f"Linguistic analysis leans toward {'human' if ling_human > ling_ai else 'AI'} writing. " | |
| f"Variance: {variance_between_halves:.2f}%." | |
| ) | |
| # ----- Final Output ----- | |
| final_decision = { | |
| "verdict": verdict, | |
| "confidence": confidence, | |
| "accuracy_percentage": accuracy_percentage, | |
| "reasoning": reasoning, | |
| "supporting_data": { | |
| "overall_ai_prob": round(overall_ai_prob, 3), | |
| "avg_halves_ai_score": round(avg_halves_ai_score / 100, 3), | |
| "variance_between_halves": round(variance_between_halves, 2), | |
| "first_half_model": first_model, | |
| "second_half_model": second_model, | |
| "models_agree": models_agree, | |
| "ensemble_models_used": models_used, | |
| "ensemble_confidence": ensemble_confidence_boost, | |
| "linguistic_ai_score": ling_ai, | |
| "linguistic_human_score": ling_human, | |
| "model_linguistic_agreement": model_ling_agreement, | |
| "combined_ai_score": round(combined_avg_ai, 2), | |
| }, | |
| } | |
| return { | |
| "halves_analysis_available": True, | |
| "cleaned_content": { | |
| "total_words": len(cleaned_text.split()), | |
| "first_half_words": first_half_words, | |
| "second_half_words": second_half_words, | |
| }, | |
| "first_half": { | |
| "ai_percentage": first_ai, | |
| "human_percentage": first_half_result["human_percentage"], | |
| "predicted_model": first_model, | |
| "word_count": first_half_words, | |
| "preview": first_half[:200] + "..." if len(first_half) > 200 else first_half, | |
| "top_5_predictions": first_top5, | |
| "models_used": models_used, | |
| "linguistic_analysis": first_half_linguistic, | |
| }, | |
| "second_half": { | |
| "ai_percentage": second_ai, | |
| "human_percentage": second_half_result["human_percentage"], | |
| "predicted_model": second_model, | |
| "word_count": second_half_words, | |
| "preview": second_half[:200] + "..." if len(second_half) > 200 else second_half, | |
| "top_5_predictions": second_top5, | |
| "models_used": models_used, | |
| "linguistic_analysis": second_half_linguistic, | |
| }, | |
| "final_decision": final_decision, | |
| "overall_linguistic_analysis": linguistic_analysis, | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in halves analysis: {e}", exc_info=True) | |
| return { | |
| "halves_analysis_available": False, | |
| "error": str(e) | |
| } | |
| # ===================================================== | |
| # 📝 Pydantic Models | |
| # ===================================================== | |
| class TextInput(BaseModel): | |
| text: str | |
| analyze_paragraphs: bool = False | |
| class SimpleTextInput(BaseModel): | |
| text: str | |
| class DetectionResult(BaseModel): | |
| success: bool | |
| code: int | |
| message: str | |
| data: Dict | |
| # ===================================================== | |
| # 🔧 مساعدات | |
| # ===================================================== | |
| def split_into_paragraphs(text: str, min_length: int = 100) -> List[str]: | |
| """تقسيم النص إلى فقرات""" | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| return [p.strip() for p in paragraphs if len(p.strip()) >= min_length] | |
| # ===================================================== | |
| # 🌐 FastAPI Application | |
| # ===================================================== | |
| app = FastAPI( | |
| title="ModernBERT AI Text Detector API", | |
| description="API for detecting AI-generated text using ModernBERT", | |
| version="2.0.0" | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Model Manager Instance | |
| model_manager = ModelManager() | |
| # ===================================================== | |
| # 🚀 Startup Event | |
| # ===================================================== | |
| async def startup_event(): | |
| """تحميل الموديلات عند بدء التطبيق""" | |
| logger.info("🚀 Starting application...") | |
| logger.info("📦 Loading models...") | |
| success = model_manager.load_models() | |
| if success: | |
| logger.info("✅ Application ready! (Fallback mode: %s)", model_manager.using_fallback) | |
| else: | |
| logger.error("⚠️ Failed to load models - API will return errors") | |
| logger.info("💡 Tip: Ensure 'transformers>=4.45.0' and 'huggingface_hub' are installed. Run: pip install --upgrade transformers huggingface_hub") | |
| async def root(): | |
| """الصفحة الرئيسية""" | |
| return { | |
| "message": "ModernBERT AI Text Detector API", | |
| "status": "online" if model_manager.models_loaded else "initializing", | |
| "models_loaded": len(model_manager.models), | |
| "using_fallback": model_manager.using_fallback, | |
| "device": str(device), | |
| "endpoints": { | |
| "analyze": "/analyze", | |
| "simple": "/analyze-simple", | |
| "health": "/health", | |
| "docs": "/docs" | |
| } | |
| } | |
| async def health_check(): | |
| """فحص صحة الخدمة""" | |
| memory_info = {} | |
| if torch.cuda.is_available(): | |
| memory_info = { | |
| "gpu_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2), | |
| "gpu_reserved_gb": round(torch.cuda.memory_reserved() / 1024**3, 2) | |
| } | |
| return { | |
| "status": "healthy" if model_manager.models_loaded else "unhealthy", | |
| "models_loaded": len(model_manager.models), | |
| "using_fallback": model_manager.using_fallback, | |
| "device": str(device), | |
| "cuda_available": torch.cuda.is_available(), | |
| "memory_info": memory_info | |
| } | |
| async def analyze_text(data: TextInput): | |
| """ | |
| تحليل النص للكشف عن AI | |
| يحاكي نفس وظيفة Gradio classify_text | |
| """ | |
| try: | |
| # التحقق من النص | |
| text = data.text.strip() | |
| if not text: | |
| return DetectionResult( | |
| success=False, | |
| code=400, | |
| message="Empty input text", | |
| data={} | |
| ) | |
| # التأكد من تحميل الموديلات | |
| if not model_manager.models_loaded: | |
| # محاولة تحميل الموديلات | |
| if not model_manager.load_models(): | |
| return DetectionResult( | |
| success=False, | |
| code=503, | |
| message="Models not available. Check logs for details.", | |
| data={} | |
| ) | |
| # حساب عدد الكلمات | |
| total_words = len(text.split()) | |
| # التحليل الأساسي | |
| result = model_manager.classify_text(text) | |
| # النتائج الأساسية | |
| ai_percentage = result["ai_percentage"] | |
| human_percentage = result["human_percentage"] | |
| ai_words = int(total_words * (ai_percentage / 100)) | |
| # تحليل الفقرات إذا طُلب ذلك | |
| paragraphs_analysis = [] | |
| if data.analyze_paragraphs and ai_percentage > 50: | |
| paragraphs = split_into_paragraphs(text) | |
| recalc_ai_words = 0 | |
| recalc_total_words = 0 | |
| for para in paragraphs[:10]: # حد أقصى 10 فقرات | |
| if para.strip(): | |
| try: | |
| para_result = model_manager.classify_text(para) | |
| para_words = len(para.split()) | |
| recalc_total_words += para_words | |
| recalc_ai_words += para_words * (para_result["ai_percentage"] / 100) | |
| paragraphs_analysis.append({ | |
| "paragraph": para[:200] + "..." if len(para) > 200 else para, | |
| "ai_generated_score": para_result["ai_percentage"] / 100, | |
| "human_written_score": para_result["human_percentage"] / 100, | |
| "predicted_model": para_result["predicted_model"] | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze paragraph: {e}") | |
| # إعادة حساب النسب بناءً على الفقرات | |
| if recalc_total_words > 0: | |
| ai_percentage = round((recalc_ai_words / recalc_total_words) * 100, 2) | |
| human_percentage = round(100 - ai_percentage, 2) | |
| ai_words = int(recalc_ai_words) | |
| # 🆕 NEW FEATURE: Analyze content by halves (pass overall result for variance calculation) | |
| halves_analysis = analyze_content_halves(model_manager, text, result) | |
| # إنشاء رسالة التغذية الراجعة | |
| if ai_percentage > 50: | |
| feedback = "Most of Your Text is AI/GPT Generated" | |
| else: | |
| feedback = "Most of Your Text Appears Human-Written" | |
| # إرجاع النتائج بنفس تنسيق الكود الأصلي + إضافة تحليل النصفين | |
| return DetectionResult( | |
| success=True, | |
| code=200, | |
| message="analysis completed", | |
| data={ | |
| "fakePercentage": ai_percentage, | |
| "isHuman": human_percentage, | |
| "textWords": total_words, | |
| "aiWords": ai_words, | |
| "paragraphs": paragraphs_analysis, | |
| "predicted_model": result["predicted_model"], | |
| "feedback": feedback, | |
| "input_text": text[:500] + "..." if len(text) > 500 else text, | |
| "detected_language": "en", | |
| "top_5_predictions": result.get("top_5_predictions", []), | |
| "models_used": result.get("models_used", 1), | |
| "using_fallback": result.get("using_fallback", False), | |
| # 🆕 NEW: Halves analysis appended to response | |
| "halves_analysis": halves_analysis | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}", exc_info=True) | |
| return DetectionResult( | |
| success=False, | |
| code=500, | |
| message=f"Analysis failed: {str(e)}", | |
| data={} | |
| ) | |
| async def analyze_simple(data: SimpleTextInput): | |
| """ | |
| تحليل مبسط - يرجع النتائج الأساسية فقط | |
| """ | |
| try: | |
| text = data.text.strip() | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Empty text") | |
| if not model_manager.models_loaded: | |
| if not model_manager.load_models(): | |
| raise HTTPException(status_code=503, detail="Models not available") | |
| result = model_manager.classify_text(text) | |
| return { | |
| "is_ai": result["ai_percentage"] > 50, | |
| "ai_score": result["ai_percentage"], | |
| "human_score": result["human_percentage"], | |
| "detected_model": result["predicted_model"] if result["ai_percentage"] > 50 else None, | |
| "confidence": max(result["ai_percentage"], result["human_percentage"]), | |
| "using_fallback": result.get("using_fallback", False) | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Simple analysis error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ===================================================== | |
| # 🏃 تشغيل التطبيق | |
| # ===================================================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # الحصول على الإعدادات من البيئة | |
| port = int(os.environ.get("PORT", 8000)) | |
| host = os.environ.get("HOST", "0.0.0.0") | |
| workers = int(os.environ.get("WORKERS", 1)) | |
| logger.info("=" * 50) | |
| logger.info(f"🌐 Starting server on {host}:{port}") | |
| logger.info(f"👷 Workers: {workers}") | |
| logger.info(f"📚 Documentation: http://{host}:{port}/docs") | |
| logger.info("=" * 50) | |
| uvicorn.run( | |
| "main:app", # Assuming this file is named main.py | |
| host=host, | |
| port=port, | |
| workers=workers, | |
| reload=False # Set to True for dev | |
| ) | |