# api/endpoints.py # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 import os import uuid from fastapi import APIRouter, Depends, HTTPException, Request, status, UploadFile, File , Body from fastapi.responses import StreamingResponse from api.database import User, Conversation, Message from api.models import QueryRequest, ConversationOut, ConversationCreate, UserUpdate from api.auth import current_active_user from api.database import get_db from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from utils.generation import request_generation, select_model, check_model_availability from utils.web_search import web_search import io import asyncio import json from openai import OpenAI from motor.motor_asyncio import AsyncIOMotorClient from datetime import datetime import logging from typing import List, Optional # from utils.constants import MODEL_ALIASES, MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME, CLIP_BASE_MODEL, CLIP_LARGE_MODEL, ASR_MODEL, TTS_MODEL, IMAGE_GEN_MODEL, SECONDARY_IMAGE_GEN_MODEL from utils.constants import MODEL_ALIASES, MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME, CLIP_BASE_MODEL, CLIP_LARGE_MODEL, ASR_MODEL, TTS_MODEL, IMAGE_GEN_MODEL, SECONDARY_IMAGE_GEN_MODEL, IMAGE_INFERENCE_API import psutil import time router = APIRouter() from pydantic import BaseModel logger = logging.getLogger(__name__) # Check HF_TOKEN and BACKUP_HF_TOKEN HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: logger.error("HF_TOKEN is not set in environment variables.") raise ValueError("HF_TOKEN is required for Inference API.") BACKUP_HF_TOKEN = os.getenv("BACKUP_HF_TOKEN") if not BACKUP_HF_TOKEN: logger.warning("BACKUP_HF_TOKEN is not set. Fallback to secondary model will not work if primary token fails.") ROUTER_API_URL = os.getenv("ROUTER_API_URL", "https://router.huggingface.co") API_ENDPOINT = os.getenv("API_ENDPOINT", "https://router.huggingface.co/v1") FALLBACK_API_ENDPOINT = os.getenv("FALLBACK_API_ENDPOINT", "https://api-inference.huggingface.co/v1") # MongoDB setup MONGO_URI = os.getenv("MONGODB_URI") client = AsyncIOMotorClient(MONGO_URI) db = client["hager"] session_message_counts = db["session_message_counts"] class ImageGenRequest(BaseModel): prompt: str output_format: str = "image" # Helper function to handle sessions for non-logged-in users async def handle_session(request: Request): if not hasattr(request, "session"): raise HTTPException(status_code=500, detail="Session middleware not configured") session_id = request.session.get("session_id") if not session_id: session_id = str(uuid.uuid4()) request.session["session_id"] = session_id await session_message_counts.insert_one({"session_id": session_id, "message_count": 0}) session_doc = await session_message_counts.find_one({"session_id": session_id}) if not session_doc: session_doc = {"session_id": session_id, "message_count": 0} await session_message_counts.insert_one(session_doc) message_count = session_doc["message_count"] + 1 await session_message_counts.update_one( {"session_id": session_id}, {"$set": {"message_count": message_count}} ) if message_count > 4: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Message limit reached. Please log in to continue." ) return session_id # Helper function to enhance system prompt for Arabic language def enhance_system_prompt(system_prompt: str, message: str, user: Optional[User] = None) -> str: enhanced_prompt = system_prompt if any(0x0600 <= ord(char) <= 0x06FF for char in message): enhanced_prompt += "\nRespond in Arabic with clear, concise, and accurate information tailored to the user's query." if user and user.additional_info: enhanced_prompt += f"\nUser Profile: {user.additional_info}\nConversation Style: {user.conversation_style or 'default'}" return enhanced_prompt @router.get("/api/settings") async def get_settings(user: User = Depends(current_active_user)): if not user: raise HTTPException(status_code=401, detail="Login required") return { "available_models": [ {"alias": "advanced", "description": "High-performance model for complex queries"}, {"alias": "standard", "description": "Balanced model for general use"}, {"alias": "light", "description": "Lightweight model for quick responses"} ], "conversation_styles": ["default", "concise", "analytical", "creative"], "user_settings": { "display_name": user.display_name, "preferred_model": user.preferred_model, "job_title": user.job_title, "education": user.education, "interests": user.interests, "additional_info": user.additional_info, "conversation_style": user.conversation_style } } @router.get("/api/model-info") async def model_info(): return { "available_models": [ {"alias": "advanced", "description": "High-performance model for complex queries"}, {"alias": "standard", "description": "Balanced model for general use"}, {"alias": "light", "description": "Lightweight model for quick responses"}, {"alias": "image_base", "description": "Basic image analysis model"}, {"alias": "image_advanced", "description": "Advanced image analysis model"}, {"alias": "audio", "description": "Audio transcription model (default)"}, {"alias": "tts", "description": "Text-to-speech model (default)"} ], "api_base": API_ENDPOINT, "fallback_api_base": FALLBACK_API_ENDPOINT, "status": "online" } @router.get("/api/performance") async def performance_stats(): return { "queue_size": int(os.getenv("QUEUE_SIZE", 80)), "concurrency_limit": int(os.getenv("CONCURRENCY_LIMIT", 20)), "uptime": time.time() - psutil.boot_time() # مدة تشغيل النظام بالثواني } @router.post("/api/chat") async def chat_endpoint( request: Request, req: QueryRequest, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): logger.info(f"Received chat request: {req}") if not user: await handle_session(request) conversation = None if user: title = req.title or (req.message[:50] + "..." if len(req.message) > 50 else req.message or "Untitled Conversation") result = await db.execute( select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) ) conversation = result.scalar_one_or_none() if not conversation: conversation_id = str(uuid.uuid4()) conversation = Conversation( conversation_id=conversation_id, user_id=user.id, title=title ) db.add(conversation) await db.commit() await db.refresh(conversation) user_msg = Message(role="user", content=req.message, conversation_id=conversation.id) db.add(user_msg) await db.commit() preferred_model = user.preferred_model if user else None model_name, api_endpoint = select_model(req.message, input_type="text", preferred_model=preferred_model) is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.warning(f"Model {model_name} is not available at {api_endpoint}, trying fallback model.") model_name = SECONDARY_MODEL_NAME is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Fallback model {model_name} is not available at {selected_endpoint}") raise HTTPException(status_code=503, detail=f"No available models. Tried {MODEL_NAME} and {SECONDARY_MODEL_NAME}.") system_prompt = enhance_system_prompt(req.system_prompt, req.message, user) stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=req.message, system_prompt=system_prompt, model_name=model_name, chat_history=req.history, temperature=req.temperature, max_new_tokens=req.max_new_tokens or 2048, deep_search=req.enable_browsing, input_type="text", output_format=req.output_format ) if req.output_format == "audio": audio_chunks = [] try: for chunk in stream: logger.debug(f"Processing audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): audio_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in audio stream: {chunk}") if not audio_chunks: logger.error("No audio data generated.") raise HTTPException(status_code=502, detail="No audio data generated. Model may be unavailable.") audio_data = b"".join(audio_chunks) return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") except Exception as e: logger.error(f"Audio generation failed: {e}") raise HTTPException(status_code=502, detail=f"Audio generation failed: {str(e)}") async def stream_response(): response_chunks = [] try: for chunk in stream: if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) yield chunk.encode('utf-8') # إرسال الـ chunk مباشرةً await asyncio.sleep(0.05) # تأخير بسيط لمحاكاة الكتابة else: logger.warning(f"Skipping chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.warning(f"Empty response from {model_name}. Trying fallback model {SECONDARY_MODEL_NAME}.") model_name = SECONDARY_MODEL_NAME is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Fallback model {model_name} is not available at {selected_endpoint}") yield f"Error: No available models. Tried {MODEL_NAME} and {SECONDARY_MODEL_NAME}.".encode('utf-8') return stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=req.message, system_prompt=system_prompt, model_name=model_name, chat_history=req.history, temperature=req.temperature, max_new_tokens=req.max_new_tokens or 2048, deep_search=req.enable_browsing, input_type="text", output_format=req.output_format ) response_chunks = [] for chunk in stream: if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) yield chunk.encode('utf-8') await asyncio.sleep(0.05) else: logger.warning(f"Skipping fallback chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error(f"Empty response from fallback model {model_name}.") yield f"Error: Empty response from both {MODEL_NAME} and {SECONDARY_MODEL_NAME}.".encode('utf-8') return if user and conversation: assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) db.add(assistant_msg) await db.commit() conversation.updated_at = datetime.utcnow() await db.commit() yield json.dumps({ "conversation_id": conversation.conversation_id, "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", "conversation_title": conversation.title }, ensure_ascii=False).encode('utf-8') except Exception as e: logger.error(f"Chat generation failed: {e}") yield f"Error: Chat generation failed: {str(e)}".encode('utf-8') return StreamingResponse(stream_response(), media_type="text/plain") @router.post("/api/image-generation") async def image_generation_endpoint( request: Request, req: dict, file: Optional[UploadFile] = File(None), user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: await handle_session(request) prompt = req.get("prompt", "") output_format = req.get("output_format", "image") if not prompt.strip(): raise HTTPException(status_code=400, detail="Prompt is required for image generation.") model_name, api_endpoint = select_model(prompt, input_type="image_gen") is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") image_data = None if file: image_data = await file.read() system_prompt = enhance_system_prompt( "You are an expert in generating high-quality images based on detailed prompts. Ensure the output is visually appealing and matches the user's description.", prompt, user ) stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=prompt, system_prompt=system_prompt, model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="image_gen", image_data=image_data, output_format=output_format ) if output_format == "image": image_chunks = [] try: for chunk in stream: logger.debug(f"Processing image chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): image_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in image stream: {chunk}") if not image_chunks: logger.error("No image data generated.") raise HTTPException(status_code=500, detail="No image data generated for image generation.") image_data = b"".join(image_chunks) return StreamingResponse(io.BytesIO(image_data), media_type="image/png") except Exception as e: logger.error(f"Image generation failed: {e}") raise HTTPException(status_code=500, detail=f"Image generation failed: {str(e)}") response_chunks = [] try: for chunk in stream: logger.debug(f"Processing text chunk: {chunk[:100]}...") if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) else: logger.warning(f"Skipping chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error("Empty response generated.") raise HTTPException(status_code=500, detail="Empty response generated from model.") return {"response": response} except Exception as e: logger.error(f"Image generation failed: {e}") raise HTTPException(status_code=500, detail=f"Image generation failed: {str(e)}") @router.post("/api/audio-transcription") async def audio_transcription_endpoint( request: Request, file: UploadFile = File(...), user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): logger.info(f"Received audio transcription request for file: {file.filename}") if not user: await handle_session(request) conversation = None if user: title = "Audio Transcription" result = await db.execute( select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) ) conversation = result.scalar_one_or_none() if not conversation: conversation_id = str(uuid.uuid4()) conversation = Conversation( conversation_id=conversation_id, user_id=user.id, title=title ) db.add(conversation) await db.commit() await db.refresh(conversation) user_msg = Message(role="user", content="Audio message", conversation_id=conversation.id) db.add(user_msg) await db.commit() model_name, api_endpoint = select_model("transcribe audio", input_type="audio") is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") audio_data = await file.read() stream = request_generation( api_key=api_key, api_base=selected_endpoint, message="Transcribe audio", system_prompt="Transcribe the provided audio using Whisper. Ensure accurate transcription in the detected language.", model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="audio", audio_data=audio_data, output_format="text" ) response_chunks = [] try: for chunk in stream: logger.debug(f"Processing transcription chunk: {chunk[:100]}...") if isinstance(chunk, str): response_chunks.append(chunk) else: logger.warning(f"Unexpected non-string chunk in transcription stream: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error("Empty transcription generated.") raise HTTPException(status_code=500, detail="Empty transcription generated from model.") logger.info(f"Audio transcription response: {response[:100]}...") except Exception as e: logger.error(f"Audio transcription failed: {e}") raise HTTPException(status_code=500, detail=f"Audio transcription failed: {str(e)}") if user and conversation: assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) db.add(assistant_msg) await db.commit() conversation.updated_at = datetime.utcnow() await db.commit() return { "transcription": response, "conversation_id": conversation.conversation_id, "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", "conversation_title": conversation.title } return {"transcription": response} @router.post("/api/text-to-speech") async def text_to_speech_endpoint( request: Request, req: dict, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: await handle_session(request) text = req.get("text", "") if not text.strip(): raise HTTPException(status_code=400, detail="Text input is required for text-to-speech.") model_name, api_endpoint = select_model("text to speech", input_type="tts") is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=text, system_prompt="Convert the provided text to speech using a text-to-speech model. Ensure clear and natural pronunciation, especially for Arabic text.", model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="tts", output_format="audio" ) audio_chunks = [] try: for chunk in stream: logger.debug(f"Processing TTS chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): audio_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in TTS stream: {chunk}") if not audio_chunks: logger.error("No audio data generated for TTS.") raise HTTPException(status_code=500, detail="No audio data generated for text-to-speech.") audio_data = b"".join(audio_chunks) return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") except Exception as e: logger.error(f"Text-to-speech generation failed: {e}") raise HTTPException(status_code=500, detail=f"Text-to-speech generation failed: {str(e)}") @router.post("/api/code") async def code_endpoint( request: Request, req: dict, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: await handle_session(request) framework = req.get("framework") task = req.get("task") code = req.get("code", "") output_format = req.get("output_format", "text") if not task: raise HTTPException(status_code=400, detail="Task description is required.") prompt = f"Generate code for task: {task} using {framework}. Existing code: {code}" preferred_model = user.preferred_model if user else None model_name, api_endpoint = select_model(prompt, input_type="text", preferred_model=preferred_model) is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") system_prompt = enhance_system_prompt( "You are a coding expert. Provide detailed, well-commented code with examples and explanations.", prompt, user ) stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=prompt, system_prompt=system_prompt, model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="text", output_format=output_format ) if output_format == "audio": audio_chunks = [] try: for chunk in stream: logger.debug(f"Processing code audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): audio_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in code audio stream: {chunk}") if not audio_chunks: logger.error("No audio data generated for code.") raise HTTPException(status_code=500, detail="No audio data generated for code.") audio_data = b"".join(audio_chunks) return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") except Exception as e: logger.error(f"Code audio generation failed: {e}") raise HTTPException(status_code=500, detail=f"Code audio generation failed: {str(e)}") response_chunks = [] try: for chunk in stream: logger.debug(f"Processing code text chunk: {chunk[:100]}...") if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) else: logger.warning(f"Skipping code chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error("Empty code response generated.") raise HTTPException(status_code=500, detail="Empty code response generated from model.") return {"generated_code": response} except Exception as e: logger.error(f"Code generation failed: {e}") raise HTTPException(status_code=500, detail=f"Code generation failed: {str(e)}") @router.post("/api/analysis") async def analysis_endpoint( request: Request, req: dict, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: await handle_session(request) message = req.get("text", "") output_format = req.get("output_format", "text") if not message.strip(): raise HTTPException(status_code=400, detail="Text input is required for analysis.") preferred_model = user.preferred_model if user else None model_name, api_endpoint = select_model(message, input_type="text", preferred_model=preferred_model) is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") system_prompt = enhance_system_prompt( "You are an expert analyst. Provide detailed analysis with step-by-step reasoning and examples.", message, user ) stream = request_generation( api_key=api_key, api_base=selected_endpoint, message=message, system_prompt=system_prompt, model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="text", output_format=output_format ) if output_format == "audio": audio_chunks = [] try: for chunk in stream: logger.debug(f"Processing analysis audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): audio_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in analysis audio stream: {chunk}") if not audio_chunks: logger.error("No audio data generated for analysis.") raise HTTPException(status_code=500, detail="No audio data generated for analysis.") audio_data = b"".join(audio_chunks) return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") except Exception as e: logger.error(f"Analysis audio generation failed: {e}") raise HTTPException(status_code=500, detail=f"Analysis audio generation failed: {str(e)}") response_chunks = [] try: for chunk in stream: logger.debug(f"Processing analysis text chunk: {chunk[:100]}...") if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) else: logger.warning(f"Skipping analysis chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error("Empty analysis response generated.") raise HTTPException(status_code=500, detail="Empty analysis response generated from model.") return {"analysis": response} except Exception as e: logger.error(f"Analysis generation failed: {e}") raise HTTPException(status_code=500, detail=f"Analysis generation failed: {str(e)}") @router.post("/api/image-analysis") async def image_analysis_endpoint( request: Request, file: UploadFile = File(...), output_format: str = "text", user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: await handle_session(request) conversation = None if user: title = "Image Analysis" result = await db.execute( select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) ) conversation = result.scalar_one_or_none() if not conversation: conversation_id = str(uuid.uuid4()) conversation = Conversation( conversation_id=conversation_id, user_id=user.id, title=title ) db.add(conversation) await db.commit() await db.refresh(conversation) user_msg = Message(role="user", content="Image analysis request", conversation_id=conversation.id) db.add(user_msg) await db.commit() preferred_model = user.preferred_model if user else None model_name, api_endpoint = select_model("analyze image", input_type="image", preferred_model=preferred_model) is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) if not is_available: logger.error(f"Model {model_name} is not available at {api_endpoint}") raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") image_data = await file.read() system_prompt = enhance_system_prompt( "You are an expert in image analysis. Provide detailed descriptions or classifications based on the query.", "Analyze this image", user ) stream = request_generation( api_key=api_key, api_base=selected_endpoint, message="Analyze this image", system_prompt=system_prompt, model_name=model_name, temperature=0.7, max_new_tokens=2048, input_type="image", image_data=image_data, output_format=output_format ) if output_format == "audio": audio_chunks = [] try: for chunk in stream: logger.debug(f"Processing image analysis audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") if isinstance(chunk, bytes): audio_chunks.append(chunk) else: logger.warning(f"Unexpected non-bytes chunk in image analysis audio stream: {chunk}") if not audio_chunks: logger.error("No audio data generated for image analysis.") raise HTTPException(status_code=500, detail="No audio data generated for image analysis.") audio_data = b"".join(audio_chunks) return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") except Exception as e: logger.error(f"Image analysis audio generation failed: {e}") raise HTTPException(status_code=500, detail=f"Image analysis audio generation failed: {str(e)}") response_chunks = [] try: for chunk in stream: logger.debug(f"Processing image analysis text chunk: {chunk[:100]}...") if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: response_chunks.append(chunk) else: logger.warning(f"Skipping image analysis chunk: {chunk}") response = "".join(response_chunks) if not response.strip(): logger.error("Empty image analysis response generated.") raise HTTPException(status_code=500, detail="Empty image analysis response generated from model.") if user and conversation: assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) db.add(assistant_msg) await db.commit() conversation.updated_at = datetime.utcnow() await db.commit() return { "image_analysis": response, "conversation_id": conversation.conversation_id, "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", "conversation_title": conversation.title } return {"image_analysis": response} except Exception as e: logger.error(f"Image analysis failed: {e}") raise HTTPException(status_code=500, detail=f"Image analysis failed: {str(e)}") @router.get("/api/test-model") async def test_model(model: str = MODEL_NAME, endpoint: str = API_ENDPOINT): try: is_available, api_key, selected_endpoint = check_model_availability(model, HF_TOKEN) if not is_available: logger.error(f"Model {model} is not available at {endpoint}") raise HTTPException(status_code=503, detail=f"Model {model} is not available.") client = OpenAI(api_key=api_key, base_url=selected_endpoint, timeout=60.0) response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": "Test"}], max_tokens=50 ) logger.debug(f"Test model response: {response.choices[0].message.content}") return {"status": "success", "response": response.choices[0].message.content} except Exception as e: logger.error(f"Test model failed: {e}") raise HTTPException(status_code=500, detail=f"Test model failed: {str(e)}") @router.post("/api/conversations", response_model=ConversationOut) async def create_conversation( req: ConversationCreate, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") conversation_id = str(uuid.uuid4()) conversation = Conversation( conversation_id=conversation_id, title=req.title or "Untitled Conversation", user_id=user.id ) db.add(conversation) await db.commit() await db.refresh(conversation) return ConversationOut.from_orm(conversation) @router.get("/api/conversations/{conversation_id}", response_model=ConversationOut) async def get_conversation( conversation_id: str, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") result = await db.execute( select(Conversation).filter( Conversation.conversation_id == conversation_id, Conversation.user_id == user.id ) ) conversation = result.scalar_one_or_none() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") return ConversationOut.from_orm(conversation) @router.get("/api/conversations", response_model=List[ConversationOut]) async def list_conversations( user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") result = await db.execute( select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.created_at.desc()) ) conversations = result.scalars().all() return [ConversationOut.from_orm(conv) for conv in conversations] @router.put("/api/conversations/{conversation_id}/title") async def update_conversation_title( conversation_id: str, title: str, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") result = await db.execute( select(Conversation).filter( Conversation.conversation_id == conversation_id, Conversation.user_id == user.id ) ) conversation = result.scalar_one_or_none() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") conversation.title = title conversation.updated_at = datetime.utcnow() await db.commit() return {"message": "Conversation title updated", "title": conversation.title} @router.delete("/api/conversations/{conversation_id}") async def delete_conversation( conversation_id: str, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") result = await db.execute( select(Conversation).filter( Conversation.conversation_id == conversation_id, Conversation.user_id == user.id ) ) conversation = result.scalar_one_or_none() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") await db.execute(delete(Message).filter(Message.conversation_id == conversation.id)) await db.delete(conversation) await db.commit() return {"message": "Conversation deleted successfully"} @router.get("/users/me") async def get_user_settings(user: User = Depends(current_active_user)): if not user: raise HTTPException(status_code=401, detail="Login required") return { "id": user.id, "email": user.email, "display_name": user.display_name, "preferred_model": user.preferred_model, "job_title": user.job_title, "education": user.education, "interests": user.interests, "additional_info": user.additional_info, "conversation_style": user.conversation_style, "is_active": user.is_active, "is_superuser": user.is_superuser } @router.get("/api/verify-token") async def verify_token(user: User = Depends(current_active_user)): if not user: raise HTTPException(status_code=401, detail="Invalid or expired token") return { "status": "valid", "user": { "id": user.id, "email": user.email, "is_active": user.is_active } } @router.put("/users/me") async def update_user_settings( settings: UserUpdate, user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") if settings.preferred_model and settings.preferred_model not in MODEL_ALIASES: raise HTTPException(status_code=400, detail="Invalid model alias") if settings.display_name is not None: user.display_name = settings.display_name if settings.preferred_model is not None: user.preferred_model = settings.preferred_model if settings.job_title is not None: user.job_title = settings.job_title if settings.education is not None: user.education = settings.education if settings.interests is not None: user.interests = settings.interests if settings.additional_info is not None: user.additional_info = settings.additional_info if settings.conversation_style is not None: user.conversation_style = settings.conversation_style await db.commit() await db.refresh(user) return {"message": "Settings updated successfully", "user": { "id": user.id, "email": user.email, "display_name": user.display_name, "preferred_model": user.preferred_model, "job_title": user.job_title, "education": user.education, "interests": user.interests, "additional_info": user.additional_info, "conversation_style": user.conversation_style, "is_active": user.is_active, "is_superuser": user.is_superuser }} @router.post("/api/conversations/sync", response_model=ConversationOut) async def sync_conversation( request: Request, payload: dict = Body(...), user: User = Depends(current_active_user), db: AsyncSession = Depends(get_db) ): if not user: raise HTTPException(status_code=401, detail="Login required") messages = payload.get("messages", []) title = payload.get("title", "Untitled Conversation") conversation_id = payload.get("conversation_id") logger.info(f"Syncing conversation for user {user.email}, conversation_id: {conversation_id}") try: # Check if conversation exists if conversation_id: result = await db.execute( select(Conversation).filter( Conversation.conversation_id == conversation_id, Conversation.user_id == user.id ) ) conversation = result.scalar_one_or_none() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") # Update existing conversation conversation.title = title conversation.updated_at = datetime.utcnow() # Delete old messages await db.execute( delete(Message).filter(Message.conversation_id == conversation.id) ) # Add new messages for msg in messages: new_message = Message( conversation_id=conversation.id, role=msg.get("role", "user"), content=msg.get("content", ""), created_at=datetime.utcnow() ) db.add(new_message) await db.commit() await db.refresh(conversation) logger.info(f"Updated conversation {conversation_id} for user {user.email}") else: # Create new conversation conversation_id = str(uuid.uuid4()) conversation = Conversation( conversation_id=conversation_id, user_id=user.id, title=title, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) db.add(conversation) await db.commit() await db.refresh(conversation) # Add messages for msg in messages: new_message = Message( conversation_id=conversation.id, role=msg.get("role", "user"), content=msg.get("content", ""), created_at=datetime.utcnow() ) db.add(new_message) await db.commit() logger.info(f"Created new conversation {conversation_id} for user {user.email}") return ConversationOut.from_orm(conversation) except Exception as e: logger.error(f"Error syncing conversation: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to sync conversation: {str(e)}")