Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, Header, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.openapi.docs import get_swagger_ui_html | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from datetime import datetime | |
| import uuid | |
| from transformers import pipeline | |
| import logging, traceback | |
| from typing import Optional, List, Union | |
| from model import ( | |
| summarize_review, smart_summarize, detect_industry, | |
| detect_product_category, detect_emotion, answer_followup, answer_only, | |
| assess_churn_risk # β Add this | |
| ) | |
| app = FastAPI( | |
| title="π§ NeuroPulse AI", | |
| description="Multilingual GenAI for smarter feedback β summarization, sentiment, emotion, aspects, Q&A and tags.", | |
| version="2025.1.0", | |
| openapi_url="/openapi.json", | |
| docs_url=None, | |
| redoc_url="/redoc" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| VALID_API_KEY = "my-secret-key" | |
| log_store = [] | |
| def root(): | |
| return "<h1>NeuroPulse AI Backend is Running</h1>" | |
| def custom_swagger_ui(): | |
| return get_swagger_ui_html( | |
| openapi_url=app.openapi_url, | |
| title="π§ Swagger UI - NeuroPulse AI", | |
| swagger_favicon_url="https://cdn-icons-png.flaticon.com/512/3794/3794616.png", | |
| swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@4.18.3/swagger-ui-bundle.js", | |
| swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@4.18.3/swagger-ui.css", | |
| ) | |
| async def exception_handler(request: Request, exc: Exception): | |
| logging.error(f"Unhandled Exception: {traceback.format_exc()}") | |
| return JSONResponse(status_code=500, content={"detail": "Internal Server Error. Please contact support."}) | |
| # ==== SCHEMAS ==== | |
| class ReviewInput(BaseModel): | |
| text: str | |
| model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
| industry: Optional[str] = None | |
| aspects: bool = False | |
| follow_up: Optional[Union[str, List[str]]] = None | |
| product_category: Optional[str] = None | |
| device: Optional[str] = None | |
| intelligence: Optional[bool] = False | |
| verbosity: Optional[str] = "detailed" | |
| class BulkReviewInput(BaseModel): | |
| reviews: List[str] | |
| model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
| industry: Optional[List[str]] = None | |
| aspects: bool = False | |
| product_category: Optional[List[str]] = None | |
| device: Optional[List[str]] = None | |
| follow_up: Optional[List[Union[str, List[str]]]] = None | |
| intelligence: Optional[bool] = False | |
| class FollowUpRequest(BaseModel): | |
| text: str | |
| question: str | |
| verbosity: Optional[str] = "brief" | |
| # ==== HELPERS ==== | |
| def auto_fill(value: Optional[str], fallback: str) -> str: | |
| if not value or value.lower() == "auto-detect": | |
| return fallback | |
| return value | |
| # ==== ENDPOINTS ==== | |
| async def analyze(data: ReviewInput, x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="β Invalid API key") | |
| if len(data.text.split()) < 20: | |
| raise HTTPException(status_code=400, detail="β οΈ Review too short for analysis (min. 20 words).") | |
| try: | |
| response = {} | |
| if not data.follow_up: | |
| summary = ( | |
| summarize_review(data.text, max_len=40, min_len=8) | |
| if data.verbosity.lower() == "brief" | |
| else smart_summarize(data.text, n_clusters=2 if data.intelligence else 1) | |
| ) | |
| sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
| sentiment = sentiment_pipeline(data.text)[0] | |
| emotion_raw = detect_emotion(data.text) | |
| emotion = emotion_raw["label"] if isinstance(emotion_raw, dict) and "label" in emotion_raw else str(emotion_raw) | |
| churn_risk = assess_churn_risk(sentiment["label"], emotion) | |
| # Log churn risk analysis | |
| log_entry = { | |
| "timestamp": datetime.now(), | |
| "product": data.product_category or "Generic", | |
| "churn_risk": churn_risk, | |
| "user_id": str(uuid.uuid4()) # Optional | |
| } | |
| log_store.append(log_entry) | |
| if len(log_store) > 1000: | |
| log_store = log_store[-1000:] # keep latest 1000 entries | |
| pain_points = [] | |
| if data.aspects: | |
| from model import extract_pain_points # π Import inline if not already | |
| pain_points = extract_pain_points(data.text) | |
| industry = detect_industry(data.text) if not data.industry or "auto" in data.industry.lower() else data.industry | |
| product_category = detect_product_category(data.text) if not data.product_category or "auto" in data.product_category.lower() else data.product_category | |
| response = { | |
| "summary": summary, | |
| "sentiment": sentiment, | |
| "emotion": emotion, | |
| "product_category": product_category, | |
| "device": "Web", | |
| "industry": industry, | |
| "churn_risk": churn_risk, | |
| "pain_points": pain_points | |
| } | |
| if data.follow_up: | |
| response["follow_up"] = answer_followup(data.text, data.follow_up, verbosity=data.verbosity) | |
| return response | |
| except Exception as e: | |
| logging.error(f"π₯ Unexpected analysis failure: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error during analysis. Please contact support.") | |
| async def followup(request: FollowUpRequest, x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if not request.question or len(request.text.split()) < 10: | |
| raise HTTPException(status_code=400, detail="Question or text is too short.") | |
| try: | |
| answer = answer_only(request.text, request.question) | |
| return {"answer": answer} | |
| except Exception as e: | |
| logging.error(f"β Follow-up failed: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error during follow-up.") | |
| async def get_churn_log(x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| return {"log": log_store} | |
| async def bulk_analyze(data: BulkReviewInput, token: str = Query(None)): | |
| if token != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="β Unauthorized: Invalid API token") | |
| try: | |
| results = [] | |
| sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
| for i, review_text in enumerate(data.reviews): | |
| if len(review_text.split()) < 20: | |
| results.append({ | |
| "review": review_text, | |
| "error": "Too short to analyze" | |
| }) | |
| continue | |
| summary = smart_summarize(review_text, n_clusters=2 if data.intelligence else 1) | |
| sentiment = sentiment_pipeline(review_text)[0] | |
| emotion = detect_emotion(review_text) | |
| churn = assess_churn_risk(sentiment["label"], emotion) | |
| pain = extract_pain_points(review_text) if data.aspects else [] | |
| # π Log churn data | |
| log_entry = { | |
| "timestamp": datetime.now(), | |
| "product": prod, | |
| "churn_risk": churn, | |
| "user_id": str(uuid.uuid4()) | |
| } | |
| log_store.append(log_entry) | |
| if len(log_store) > 1000: | |
| log_store = log_store[-1000:] | |
| ind = auto_fill(data.industry[i] if data.industry else None, detect_industry(review_text)) | |
| prod = auto_fill(data.product_category[i] if data.product_category else None, detect_product_category(review_text)) | |
| dev = auto_fill(data.device[i] if data.device else None, "Web") | |
| result = { | |
| "review": review_text, | |
| "summary": summary, | |
| "sentiment": sentiment["label"], | |
| "score": sentiment["score"], | |
| "emotion": emotion, | |
| "industry": ind, | |
| "product_category": prod, | |
| "device": dev, | |
| "churn_risk": churn, | |
| "pain_points": pain | |
| } | |
| if data.follow_up and i < len(data.follow_up): | |
| follow_q = data.follow_up[i] | |
| result["follow_up"] = answer_followup(review_text, follow_q) | |
| results.append(result) | |
| return {"results": results} | |
| except Exception as e: | |
| logging.error(f"π₯ Bulk processing failed: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Failed to analyze bulk reviews") | |