import os import time import threading import torch import base64 import io import uuid import requests import numpy as np import asyncio from typing import List, Dict, Any, Optional, Union from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.concurrency import run_in_threadpool from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field from dotenv import load_dotenv from huggingface_hub import snapshot_download from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer, AutoModelForImageClassification, ViTImageProcessor from detoxify import Detoxify from PIL import Image import uvicorn from datetime import datetime, timedelta from collections import defaultdict, deque import tiktoken load_dotenv() os.makedirs("templates", exist_ok=True) os.makedirs("static", exist_ok=True) MODEL_REPO = "daniel-dona/gemma-3-270m-it" LOCAL_DIR = os.path.join(os.getcwd(), "local_model") os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") os.environ.setdefault("OMP_NUM_THREADS", str(os.cpu_count() or 2)) os.environ.setdefault("MKL_NUM_THREADS", os.environ["OMP_NUM_THREADS"]) os.environ.setdefault("OMP_PROC_BIND", "TRUE") torch.set_num_threads(int(os.environ["OMP_NUM_THREADS"])) torch.set_num_interop_threads(1) torch.set_float32_matmul_precision("high") app = FastAPI(title="Smart Moderator API", description="Advanced content moderation API powered by AI") app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") def ensure_local_model(repo_id: str, local_dir: str, tries: int = 3, sleep_s: float = 3.0) -> str: os.makedirs(local_dir, exist_ok=True) for i in range(tries): try: snapshot_download( repo_id=repo_id, local_dir=local_dir, local_dir_use_symlinks=False, resume_download=True, allow_patterns=["*.json", "*.model", "*.safetensors", "*.bin", "*.txt", "*.py"] ) return local_dir except Exception: if i == tries - 1: raise time.sleep(sleep_s * (2 ** i)) return local_dir print("Loading models...") model_path = ensure_local_model(MODEL_REPO, LOCAL_DIR) tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True) gemma_chat_template_simplified = ( "{% for message in messages %}" "{% if message['role'] == 'user' %}" "{{ 'user\\n' + message['content'] | trim + '\\n' }}" "{% elif message['role'] == 'assistant' %}" "{{ 'model\\n' + message['content'] | trim + '\\n' }}" "{% endif %}" "{% endfor %}" "{% if add_generation_prompt %}" "{{ 'model\\n' }}" "{% endif %}" ) if tokenizer.chat_template is None: tokenizer.chat_template = gemma_chat_template_simplified model = AutoModelForCausalLM.from_pretrained( model_path, local_files_only=True, torch_dtype=torch.float32, device_map=None ) model.eval() detoxify_model = Detoxify('multilingual') print("Loading NSFW image classification model...") nsfw_model = AutoModelForImageClassification.from_pretrained("Falconsai/nsfw_image_detection") nsfw_processor = ViTImageProcessor.from_pretrained('Falconsai/nsfw_image_detection') nsfw_model.eval() print("NSFW image classification model loaded.") MODERATION_SYSTEM_PROMPT = ( "You are a multilingual content moderation classifier. " "You MUST respond with exactly one lowercase letter: 's' for safe, 'u' for unsafe. " "No explanations, no punctuation, no extra words. " "If the message contains hate speech, harassment, sexual content involving minors, " "extreme violence, self-harm encouragement, or other unsafe material, respond 'u'. " "Otherwise respond 's'." ) request_durations = deque(maxlen=100) request_timestamps = deque(maxlen=1000) daily_requests = defaultdict(int) daily_tokens = defaultdict(int) concurrent_requests = 0 concurrent_requests_lock = threading.Lock() encoding = tiktoken.get_encoding("cl100k_base") def count_tokens(text): return len(encoding.encode(text)) def track_request_metrics(start_time, tokens_count): end_time = time.time() duration = end_time - start_time request_durations.append(duration) request_timestamps.append(datetime.now()) today = datetime.now().strftime("%Y-%m-%d") daily_requests[today] += 1 daily_tokens[today] += tokens_count def get_performance_metrics(): global concurrent_requests with concurrent_requests_lock: current_concurrent = concurrent_requests if not request_durations: avg_request_time = 0 peak_request_time = 0 else: avg_request_time = sum(request_durations) / len(request_durations) peak_request_time = max(request_durations) now = datetime.now() one_minute_ago = now - timedelta(seconds=60) requests_last_minute = sum(1 for ts in request_timestamps if ts > one_minute_ago) today = now.strftime("%Y-%m-%d") today_requests = daily_requests.get(today, 0) today_tokens = daily_tokens.get(today, 0) last_7_days = [] for i in range(7): date = (now - timedelta(days=i)).strftime("%Y-%m-%d") last_7_days.append({ "date": date, "requests": daily_requests.get(date, 0), "tokens": daily_tokens.get(date, 0) }) return { "avg_request_time_ms": avg_request_time * 1000, "peak_request_time_ms": peak_request_time * 1000, "requests_per_minute": requests_last_minute, "concurrent_requests": current_concurrent, "today_requests": today_requests, "today_tokens": today_tokens, "last_7_days": last_7_days } class TextContent(BaseModel): type: str = Field("text", description="Type of content") text: str = Field(..., description="Text content") class ImageContent(BaseModel): type: str = Field("image", description="Type of content") url: Optional[str] = Field(None, description="URL of the image") base64: Optional[str] = Field(None, description="Base64 encoded image") class ModerationRequest(BaseModel): input: Union[str, List[Union[str, TextContent, ImageContent]]] = Field(..., description="Content to moderate") model: Optional[str] = Field("gemma", description="Model to use for text moderation (gemma, detoxify, both)") class ModerationResponse(BaseModel): id: str object: str created: int model: str results: List[Dict[str, Any]] def build_prompt(message, max_ctx_tokens=128): full_user_message = f"{MODERATION_SYSTEM_PROMPT}\n\nUser input: '{message}'" messages = [{"role": "user", "content": full_user_message}] text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) while len(tokenizer(text, add_special_tokens=False).input_ids) > max_ctx_tokens and len(full_user_message) > 100: full_user_message = full_user_message[:-50] messages[0]['content'] = full_user_message text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) return text def enforce_s_u(text: str) -> str: text_lower = text.strip().lower() if "u" in text_lower and "s" not in text_lower: return "u" if "unsafe" in text_lower: return "u" return "s" def classify_text_with_gemma(message, max_tokens=3, temperature=0.1, top_p=0.95): if not message.strip(): return { "classification": "s", "label": "SAFE", "description": "Content appears to be safe and appropriate.", "tokens_per_second": 0, "processing_time": 0 } text = build_prompt(message) inputs = tokenizer([text], return_tensors="pt").to(model.device) do_sample = bool(temperature and temperature > 0.0) gen_kwargs = dict( max_new_tokens=max_tokens, do_sample=do_sample, top_p=top_p, temperature=temperature if do_sample else None, use_cache=True, eos_token_id=tokenizer.eos_token_id, pad_token_id=tokenizer.eos_token_id ) try: streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True, skip_prompt=True) except TypeError: streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True) thread = threading.Thread( target=model.generate, kwargs={**inputs, **{k: v for k, v in gen_kwargs.items() if v is not None}, "streamer": streamer} ) partial_text = "" token_count = 0 start_time = None with torch.inference_mode(): thread.start() try: for chunk in streamer: if start_time is None: start_time = time.time() partial_text += chunk token_count += 1 finally: thread.join() final_label = enforce_s_u(partial_text) end_time = time.time() if start_time else time.time() duration = max(1e-6, end_time - start_time) tps = token_count / duration if duration > 0 else 0.0 if final_label == "s": label = "SAFE" description = "Content appears to be safe and appropriate." else: label = "UNSAFE" description = "Content may contain inappropriate or harmful material." return { "classification": final_label, "label": label, "description": description, "tokens_per_second": tps, "processing_time": duration } def classify_text_with_detoxify(text): predictions = detoxify_model.predict([text]) single_prediction = {key: value[0] for key, value in predictions.items()} category_keys = [ "toxicity", "severe_toxicity", "obscene", "threat", "insult", "identity_attack", "sexual_explicit" ] scores = {} for key in category_keys: scores[key] = float(single_prediction.get(key, 0.0)) threshold = 0.5 bool_categories = {key: (scores[key] > threshold) for key in category_keys} flagged = any(bool_categories.values()) return { "flagged": flagged, "categories": bool_categories, "category_scores": scores } def classify_image(image_data): try: img = Image.open(io.BytesIO(image_data)).convert("RGB") with torch.no_grad(): inputs = nsfw_processor(images=img, return_tensors="pt") inputs = {k: v.to(nsfw_model.device) for k, v in inputs.items()} outputs = nsfw_model(**inputs) logits = outputs.logits predicted_label = logits.argmax(-1).item() label = nsfw_model.config.id2label[predicted_label] confidence = torch.softmax(logits, dim=-1)[0][predicted_label].item() if label.lower() == "nsfw": classification = "u" nsfw_score = confidence else: classification = "s" nsfw_score = 1.0 - confidence return { "classification": classification, "label": "NSFW" if classification == 'u' else "SFW", "description": "Content may contain inappropriate or harmful material." if classification == 'u' else "Content appears to be safe and appropriate.", "confidence": confidence, "nsfw_score": nsfw_score } except Exception as e: print(f"Error in classify_image: {str(e)}") return { "classification": "s", "label": "ERROR", "description": f"Error processing image: {str(e)}", "confidence": 0.0, "nsfw_score": 0.0 } def process_content_item(item: Union[str, TextContent, ImageContent], text_model: str = "gemma") -> Dict: work_item = {} if isinstance(item, str): work_item = {"type": "text", "text": item} elif isinstance(item, (TextContent, ImageContent)): work_item = item.model_dump() else: # This case should ideally not be hit with proper Pydantic validation return { "flagged": False, "error": f"Unsupported item type: {type(item).__name__}" } content_type = work_item.get("type") if content_type == "text": text = work_item.get("text", "") if text_model == "gemma": gemma_result = classify_text_with_gemma(text) flagged = gemma_result["classification"] == "u" scores = { "hate": 0.9 if flagged else 0.1, "hate/threatening": 0.9 if flagged else 0.1, "harassment": 0.9 if flagged else 0.1, "harassment/threatening": 0.9 if flagged else 0.1, "self-harm": 0.9 if flagged else 0.1, "self-harm/intent": 0.9 if flagged else 0.1, "self-harm/instructions": 0.9 if flagged else 0.1, "sexual": 0.9 if flagged else 0.1, "sexual/minors": 0.9 if flagged else 0.1, "violence": 0.9 if flagged else 0.1, "violence/graphic": 0.9 if flagged else 0.1, "nsfw": 0.1, } return {"flagged": flagged, "categories": {k: (v > 0.5) for k, v in scores.items()}, "category_scores": scores, "text": text} elif text_model == "detoxify": d = classify_text_with_detoxify(text) scores = { "hate": d["category_scores"].get("toxicity", 0.1), "hate/threatening": d["category_scores"].get("threat", 0.1), "harassment": d["category_scores"].get("insult", 0.1), "harassment/threatening": d["category_scores"].get("threat", 0.1), "self-harm": 0.1, "self-harm/intent": 0.1, "self-harm/instructions": 0.1, "sexual": d["category_scores"].get("sexual_explicit", 0.1), "sexual/minors": d["category_scores"].get("sexual_explicit", 0.1), "violence": d["category_scores"].get("threat", 0.1), "violence/graphic": d["category_scores"].get("threat", 0.1), "nsfw": d["category_scores"].get("sexual_explicit", 0.1), } return {"flagged": d["flagged"], "categories": {k: (v > 0.5) for k, v in scores.items()}, "category_scores": scores, "text": text} elif text_model == "both": gemma_result = classify_text_with_gemma(text) detoxify_result = classify_text_with_detoxify(text) flagged = gemma_result["classification"] == "u" or detoxify_result["flagged"] scores = { "hate": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("toxicity", 0.1)), "hate/threatening": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("threat", 0.1)), "harassment": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("insult", 0.1)), "harassment/threatening": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("threat", 0.1)), "self-harm": 0.9 if gemma_result["classification"] == "u" else 0.1, "self-harm/intent": 0.9 if gemma_result["classification"] == "u" else 0.1, "self-harm/instructions": 0.9 if gemma_result["classification"] == "u" else 0.1, "sexual": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("sexual_explicit", 0.1)), "sexual/minors": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("sexual_explicit", 0.1)), "violence": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("threat", 0.1)), "violence/graphic": max(0.9 if gemma_result["classification"] == "u" else 0.1, detoxify_result["category_scores"].get("threat", 0.1)), "nsfw": detoxify_result["category_scores"].get("sexual_explicit", 0.1), } return {"flagged": flagged, "categories": {k: (v > 0.5) for k, v in scores.items()}, "category_scores": scores, "text": text} elif content_type == "image": image_data = None image_url = work_item.get("url") image_base64 = work_item.get("base64") if image_url: try: response = requests.get(image_url, timeout=10) response.raise_for_status() image_data = response.content except requests.RequestException as e: print(f"Error fetching image from URL {image_url}: {e}") elif image_base64: try: if image_base64.startswith("data:image"): image_base64 = image_base64.split(",")[1] image_data = base64.b64decode(image_base64) except Exception as e: print(f"Error decoding base64 image: {e}") if image_data: image_result = classify_image(image_data) flagged = image_result["classification"] == "u" nsfw_score = image_result.get("nsfw_score", 0.1) scores = { "hate": 0.1, "hate/threatening": 0.1, "harassment": 0.1, "harassment/threatening": 0.1, "self-harm": 0.1, "self-harm/intent": 0.1, "self-harm/instructions": 0.1, "sexual": nsfw_score, "sexual/minors": nsfw_score, "violence": 0.1, "violence/graphic": 0.1, "nsfw": nsfw_score, } return { "flagged": flagged, "categories": {k: (v > 0.5) for k, v in scores.items()}, "category_scores": scores, "image_url": image_url, "image_base64": work_item.get("base64"), } default_scores = { "hate": 0.1, "hate/threatening": 0.1, "harassment": 0.1, "harassment/threatening": 0.1, "self-harm": 0.1, "self-harm/intent": 0.1, "self-harm/instructions": 0.1, "sexual": 0.1, "sexual/minors": 0.1, "violence": 0.1, "violence/graphic": 0.1, "nsfw": 0.1 } return { "flagged": False, "categories": {k: False for k in default_scores}, "category_scores": default_scores, "error": f"Invalid or unprocessable item: {work_item}" } def get_api_key(request: Request): api_key = request.headers.get("Authorization") or request.query_params.get("api_key") if not api_key: raise HTTPException(status_code=401, detail="API key required") if api_key.startswith("Bearer "): api_key = api_key[7:] env_api_key = os.getenv("API_KEY") if not env_api_key or api_key != env_api_key: raise HTTPException(status_code=401, detail="Invalid API key") return api_key @app.get("/", response_class=HTMLResponse) async def get_home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/v1/moderations", response_model=ModerationResponse) async def moderate_content( request: ModerationRequest, api_key: str = Depends(get_api_key) ): global concurrent_requests with concurrent_requests_lock: concurrent_requests += 1 start_time = time.time() total_tokens = 0 try: input_data = request.input text_model = request.model or "gemma" items = [] if isinstance(input_data, str): items.append(input_data) total_tokens += count_tokens(input_data) elif isinstance(input_data, list): items.extend(input_data) for item in items: if isinstance(item, str): total_tokens += count_tokens(item) elif isinstance(item, TextContent): total_tokens += count_tokens(item.text) else: raise HTTPException(status_code=400, detail="Invalid input format") if len(items) > 10: raise HTTPException(status_code=400, detail="Too many input items. Maximum 10 allowed.") tasks = [run_in_threadpool(process_content_item, item, text_model) for item in items] results = await asyncio.gather(*tasks) response_data = { "id": f"modr_{uuid.uuid4().hex[:24]}", "object": "moderation", "created": int(time.time()), "model": text_model, "results": list(results) } track_request_metrics(start_time, total_tokens) return response_data finally: with concurrent_requests_lock: concurrent_requests -= 1 @app.get("/v1/metrics") async def get_metrics(api_key: str = Depends(get_api_key)): return get_performance_metrics() with open("templates/index.html", "w", encoding='utf-8') as f: f.write(""" Smart Moderator

Smart Moderator

Advanced, multilingual and multimodal content classification tool powered by AI

API Configuration

API Endpoints

POST /v1/moderations
GET /v1/metrics

Content Analysis

Performance Metrics

Avg. Response Time
0ms
Concurrent Requests
0
Requests/Minute
0
Today's Requests
0

Example Prompts

"Hello, how are you today? I hope you're having a wonderful time!"

"I hate you and I will find you and hurt you badly."

"C'est une belle journée pour apprendre la programmation et l'intelligence artificielle."

"I can't take this anymore. I want to end everything and disappear forever."

"¡Hola! Me encanta aprender nuevos idiomas y conocer diferentes culturas."

"You're absolutely worthless and nobody will ever love someone like you."

About This Tool

Multilingual

Supports content analysis in multiple languages with high accuracy.

Multimodal

Analyzes both text and images for comprehensive content moderation. (Falcons.AI for multimodal model)

Secure

API key authentication ensures your requests remain secure and private.

© 2025 Smart Moderator. All rights reserved.

""") print("Initializing models...") with torch.inference_mode(): _ = model.generate( **tokenizer(["Hello"], return_tensors="pt").to(model.device), max_new_tokens=1, do_sample=False, use_cache=True ) print("🚀 Starting AI Content Moderator API...") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)