Spaces:
Build error
Build error
| """ | |
| AI Tool Hub - Main Application | |
| Integrates all components into a single FastAPI application | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import secrets | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional, Union | |
| # Load environment variables from .env file | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from fastapi import FastAPI, Request, Response, Depends, HTTPException, Form, Cookie, status | |
| from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.security import APIKeyHeader, HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.orm import sessionmaker | |
| # Import provider modules | |
| from providers import get_provider, HuggingFaceProvider, OpenAIProvider, DeepSeekProvider, OpenRouterProvider | |
| # Import prompt modules | |
| from prompts import PromptTemplate, PromptTemplateManager, PromptMarketplace | |
| # Import ads module | |
| from ads import GoogleAdsManager | |
| # Import additional modules | |
| import random | |
| import time | |
| import hashlib | |
| # Import the Base from models.py | |
| from models import Base | |
| # Import the TOOLS from tools.py | |
| from tools import TOOLS | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("app") | |
| # Database setup | |
| DATABASE_URL = "sqlite:///./magicAi.db" # Path to your SQLite database | |
| engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| # Create the database tables | |
| Base.metadata.create_all(bind=engine) | |
| # Create the FastAPI app | |
| app = FastAPI( | |
| title="AI Tool Hub", | |
| description="A platform for using AI models with various tools and prompt templates", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up Jinja2 templates | |
| templates = Jinja2Templates(directory="templates") | |
| # Initialize components | |
| template_manager = PromptTemplateManager() | |
| marketplace = PromptMarketplace() | |
| ads_manager = GoogleAdsManager() | |
| # Security | |
| API_KEY_NAME = "X-API-Key" | |
| api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
| security = HTTPBearer(auto_error=False) | |
| # In-memory store of API keys for demo purpose | |
| # In production, these would be stored in a database | |
| API_KEYS = {} | |
| # In-memory session store for demo purpose | |
| # In production, these would be stored in a database or Redis | |
| SESSIONS = {} | |
| # Add new models for ad rewards | |
| class AdRewardRequest(BaseModel): | |
| ad_type: str # 'daily' or 'special' | |
| impression_id: Optional[str] = None | |
| class DailyReward(BaseModel): | |
| user_id: str | |
| date: str | |
| count: int = 0 | |
| last_reward: Optional[str] = None | |
| class SpecialReward(BaseModel): | |
| user_id: str | |
| date: str | |
| claimed: bool = False | |
| claimed_at: Optional[str] = None | |
| # In-memory stores for rewards (in production, use a database) | |
| DAILY_REWARDS = {} | |
| SPECIAL_REWARDS = {} | |
| # Models | |
| class CreditUpdateRequest(BaseModel): | |
| user_id: str | |
| amount: float | |
| reason: str = "manual" | |
| class GenerateRequest(BaseModel): | |
| prompt: str | |
| model: str | |
| provider: str | |
| system_message: Optional[str] = None | |
| max_tokens: Optional[int] = 1000 | |
| temperature: Optional[float] = 0.7 | |
| prompt_template_id: Optional[str] = None | |
| template_variables: Optional[Dict[str, Any]] = None | |
| class ImageGenerateRequest(BaseModel): | |
| prompt: str | |
| provider: str = "openai" | |
| model: Optional[str] = None | |
| size: Optional[str] = "1024x1024" | |
| prompt_template_id: Optional[str] = None | |
| template_variables: Optional[Dict[str, Any]] = None | |
| class UserInfo(BaseModel): | |
| id: str | |
| username: str | |
| email: str | |
| credits: float = 10.0 # New users start with some free credits | |
| created_at: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
| last_login: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
| role: str = "user" | |
| # Models for prompt system | |
| class Prompt(BaseModel): | |
| id: str | |
| tool_id: str | |
| title: str | |
| content: str | |
| description: Optional[str] = None | |
| creator_id: Optional[str] = None | |
| is_default: bool = False | |
| usage_count: int = 0 | |
| avg_rating: float = 0.0 | |
| created_at: str = None | |
| updated_at: str = None | |
| tags: List[str] = [] | |
| class PromptRating(BaseModel): | |
| prompt_id: str | |
| user_id: str | |
| rating: int # 1-5 | |
| created_at: str | |
| class UserPromptHistory(BaseModel): | |
| user_id: str | |
| prompt_id: str | |
| tool_id: str | |
| used_at: str | |
| was_modified: bool = False | |
| modifications: Optional[str] = None | |
| # Mock database (replace with actual DynamoDB integration) | |
| PROMPTS = {} | |
| PROMPT_RATINGS = [] | |
| USER_PROMPT_HISTORY = [] | |
| # Initialize default prompts | |
| def init_default_prompts(): | |
| default_prompts = [ | |
| { | |
| "id": "text-summary-default", | |
| "tool_id": "text-summary", | |
| "title": "Concise Text Summary", | |
| "content": "Summarize the following text in 3-5 bullet points highlighting the main ideas: {{input}}", | |
| "description": "Creates a bulleted summary of any text", | |
| "is_default": True, | |
| "tags": ["summary", "concise", "bullets"] | |
| }, | |
| { | |
| "id": "image-portrait-default", | |
| "tool_id": "image-generator", | |
| "title": "Professional Portrait", | |
| "content": "Create a professional portrait photo of {{subject}}, high quality, studio lighting, detailed features, professional attire", | |
| "description": "Generates professional-looking portrait images", | |
| "is_default": True, | |
| "tags": ["portrait", "professional", "photo"] | |
| }, | |
| { | |
| "id": "text-blog-default", | |
| "tool_id": "text-generator", | |
| "title": "Blog Post Generator", | |
| "content": "Write a comprehensive blog post about {{topic}}. Include an introduction, 3-5 main sections with subheadings, and a conclusion. Use a conversational tone and include practical examples where appropriate.", | |
| "description": "Creates complete blog posts with proper structure", | |
| "is_default": True, | |
| "tags": ["blog", "writing", "content"] | |
| }, | |
| { | |
| "id": "code-python-default", | |
| "tool_id": "code-generator", | |
| "title": "Python Function", | |
| "content": "Write a Python function that {{task}}. Include docstrings, type hints, error handling, and comments explaining your logic. Provide a small example of how to use the function.", | |
| "description": "Generates well-structured Python functions", | |
| "is_default": True, | |
| "tags": ["python", "function", "code"] | |
| } | |
| ] | |
| for prompt in default_prompts: | |
| prompt_obj = Prompt( | |
| id=prompt["id"], | |
| tool_id=prompt["tool_id"], | |
| title=prompt["title"], | |
| content=prompt["content"], | |
| description=prompt["description"], | |
| is_default=prompt["is_default"], | |
| tags=prompt["tags"], | |
| created_at=datetime.now().isoformat(), | |
| updated_at=datetime.now().isoformat() | |
| ) | |
| PROMPTS[prompt["id"]] = prompt_obj | |
| # Call initialization at startup | |
| init_default_prompts() | |
| # Helper functions for prompt system | |
| def get_prompts_for_tool(tool_id: str) -> List[Prompt]: | |
| """Get all prompts for a specific tool""" | |
| return [p for p in PROMPTS.values() if p.tool_id == tool_id] | |
| def get_default_prompts_for_tool(tool_id: str) -> List[Prompt]: | |
| """Get default prompts for a specific tool""" | |
| return [p for p in PROMPTS.values() if p.tool_id == tool_id and p.is_default] | |
| def get_trending_prompts(limit: int = 5) -> List[Prompt]: | |
| """Get trending prompts based on usage count and ratings""" | |
| sorted_prompts = sorted( | |
| PROMPTS.values(), | |
| key=lambda p: (p.usage_count * 0.7 + p.avg_rating * 0.3), | |
| reverse=True | |
| ) | |
| return sorted_prompts[:limit] | |
| def get_personalized_prompts(user_id: str, tool_id: str, limit: int = 3) -> List[Prompt]: | |
| """Get personalized prompt recommendations for a user and tool""" | |
| # Get user history for this tool | |
| user_tool_history = [h for h in USER_PROMPT_HISTORY | |
| if h.user_id == user_id and h.tool_id == tool_id] | |
| if not user_tool_history: | |
| # If no history, return default prompts | |
| return get_default_prompts_for_tool(tool_id) | |
| # Count prompt usage | |
| prompt_usage = {} | |
| for history in user_tool_history: | |
| prompt_usage[history.prompt_id] = prompt_usage.get(history.prompt_id, 0) + 1 | |
| # Get most used prompts | |
| most_used_prompt_ids = sorted(prompt_usage.items(), key=lambda x: x[1], reverse=True) | |
| most_used_prompts = [PROMPTS.get(pid) for pid, _ in most_used_prompt_ids[:limit]] | |
| # If we don't have enough, add some default or trending ones | |
| if len(most_used_prompts) < limit: | |
| defaults = get_default_prompts_for_tool(tool_id) | |
| for default in defaults: | |
| if default not in most_used_prompts and len(most_used_prompts) < limit: | |
| most_used_prompts.append(default) | |
| return most_used_prompts | |
| def record_prompt_usage(user_id: str, prompt_id: str, tool_id: str, was_modified: bool = False, modifications: str = None): | |
| """Record that a user used a particular prompt""" | |
| # Update prompt usage count | |
| if prompt_id in PROMPTS: | |
| PROMPTS[prompt_id].usage_count += 1 | |
| PROMPTS[prompt_id].updated_at = datetime.now().isoformat() | |
| # Add to history | |
| history_entry = UserPromptHistory( | |
| user_id=user_id, | |
| prompt_id=prompt_id, | |
| tool_id=tool_id, | |
| used_at=datetime.now().isoformat(), | |
| was_modified=was_modified, | |
| modifications=modifications | |
| ) | |
| USER_PROMPT_HISTORY.append(history_entry) | |
| def rate_prompt(user_id: str, prompt_id: str, rating: int): | |
| """Add or update a rating for a prompt""" | |
| # Check if user already rated this prompt | |
| existing_rating = next((r for r in PROMPT_RATINGS if r.user_id == user_id and r.prompt_id == prompt_id), None) | |
| if existing_rating: | |
| # Update existing rating | |
| existing_rating.rating = rating | |
| existing_rating.created_at = datetime.now().isoformat() | |
| else: | |
| # Add new rating | |
| rating_obj = PromptRating( | |
| prompt_id=prompt_id, | |
| user_id=user_id, | |
| rating=rating, | |
| created_at=datetime.now().isoformat() | |
| ) | |
| PROMPT_RATINGS.append(rating_obj) | |
| # Update average rating on prompt | |
| if prompt_id in PROMPTS: | |
| prompt_ratings = [r.rating for r in PROMPT_RATINGS if r.prompt_id == prompt_id] | |
| if prompt_ratings: | |
| PROMPTS[prompt_id].avg_rating = sum(prompt_ratings) / len(prompt_ratings) | |
| PROMPTS[prompt_id].updated_at = datetime.now().isoformat() | |
| # Dependency to get the database session | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| # User authentication and session management | |
| def get_session_user(session_id: Optional[str] = Cookie(None)) -> Optional[UserInfo]: | |
| """Get the user from the session""" | |
| if not session_id or session_id not in SESSIONS: | |
| return None | |
| session = SESSIONS[session_id] | |
| if datetime.now() > session["expires"]: | |
| # Session expired | |
| del SESSIONS[session_id] | |
| return None | |
| # Update session expiration | |
| session["expires"] = datetime.now() + timedelta(hours=24) | |
| return session["user"] | |
| def require_session_user(admin_required: bool = False, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
| """Require a logged-in user, optionally requiring admin role""" | |
| if not session_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if admin_required and getattr(session_user, "role", "") != "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Admin access required", | |
| ) | |
| return session_user | |
| def verify_api_key(api_key: str = Depends(api_key_header), | |
| credentials: HTTPAuthorizationCredentials = Depends(security)) -> UserInfo: | |
| """Verify API key from header or Bearer token""" | |
| # Try to get the API key from the header | |
| key = api_key | |
| # If not found, try to get it from the Bearer token | |
| if not key and credentials: | |
| key = credentials.credentials | |
| # Check if the key is valid | |
| if not key or key not in API_KEYS: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return API_KEYS[key]["user"] | |
| # Credit packages configuration | |
| CREDIT_PACKAGES = [ | |
| { | |
| "id": "starter", | |
| "name": "Starter Pack", | |
| "description": "Perfect for trying out our AI tools", | |
| "credits": 100, | |
| "price": 49.99 | |
| }, | |
| { | |
| "id": "pro", | |
| "name": "Pro Pack", | |
| "description": "Most popular choice for regular users", | |
| "credits": 500, | |
| "price": 199.99 | |
| }, | |
| { | |
| "id": "enterprise", | |
| "name": "Enterprise Pack", | |
| "description": "Best value for power users", | |
| "credits": 2000, | |
| "price": 690.99 | |
| } | |
| ] | |
| # Mock user data (replace with database in production) | |
| users = {} | |
| # Routes | |
| async def home(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
| """Home page with tools list""" | |
| return templates.TemplateResponse( | |
| "index.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "tools": TOOLS, | |
| "session_user": session_user, | |
| "user": session_user, # Add user for consistency with other templates | |
| "user_credits": session_user.credits if session_user else 0 | |
| } | |
| ) | |
| async def tool_page( | |
| tool_id: str, | |
| request: Request, | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Tool-specific page to execute a specific AI tool""" | |
| # Get the tool | |
| tool = next((t for t in TOOLS if t.id == tool_id), None) | |
| if not tool: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Tool Not Found", | |
| "error_description": "The requested tool does not exist.", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| # Get example prompts for this tool | |
| suggestions = tool.example_prompts if hasattr(tool, 'example_prompts') else [] | |
| # Check if the user has enough credits | |
| has_enough_credits = session_user and session_user.credits >= tool.cost if session_user else False | |
| # Get available providers directly from the tool | |
| providers = tool.providers | |
| return templates.TemplateResponse( | |
| "tool.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "tool": tool, | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "has_enough_credits": has_enough_credits, | |
| "suggestions": suggestions, | |
| "providers": providers, | |
| "tools": TOOLS # Include all tools for sidebar navigation | |
| } | |
| ) | |
| async def result_page( | |
| request: Request, | |
| result_id: str, | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Result display page""" | |
| # In a real app, you'd fetch the result from a database | |
| # Here, we'll use a mock result for demonstration | |
| result = { | |
| "id": result_id, | |
| "type": "text", # or "image" or "code" | |
| "provider": "openai", | |
| "model": "gpt-4", | |
| "prompt": "Write a short poem about AI", | |
| "result": """ | |
| Silicon dreams in neural space, | |
| Learning patterns, quickening pace. | |
| Algorithms dance with grace divine, | |
| In zeros and ones, intelligence shine. | |
| Human-made yet growing beyond, | |
| Of our creations, we grow fond. | |
| Partners in progress, AI and we, | |
| Crafting together what's yet to be. | |
| """, | |
| "created_at": datetime.now().isoformat(), | |
| "response_time": 2.3 # seconds | |
| } | |
| return templates.TemplateResponse( | |
| "result.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "result": result, | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0 | |
| } | |
| ) | |
| async def marketplace_page( | |
| request: Request, | |
| category: Optional[str] = None, | |
| sort_by: str = "popular", | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Prompt marketplace page""" | |
| prompts = marketplace.list_marketplace_prompts(category=category, sort_by=sort_by) | |
| return templates.TemplateResponse( | |
| "marketplace.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "prompts": prompts, | |
| "category": category, | |
| "sort_by": sort_by, | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0 | |
| } | |
| ) | |
| async def ad_page( | |
| request: Request, | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Ad reward page""" | |
| return templates.TemplateResponse( | |
| "ad_reward.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0 | |
| } | |
| ) | |
| async def login_page(request: Request): | |
| """Login page""" | |
| return templates.TemplateResponse( | |
| "login.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub" | |
| } | |
| ) | |
| async def login( | |
| request: Request, | |
| username: str = Form(...), | |
| password: str = Form(...) | |
| ): | |
| """Process login""" | |
| # Check if admin login | |
| if username == "admin": | |
| # Check admin credentials | |
| admin_key = API_KEYS.get("admin_key") | |
| if not admin_key or admin_key.get("password") != password: | |
| # Admin login failure | |
| return templates.TemplateResponse( | |
| "login.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error": "Invalid admin credentials" | |
| }, | |
| status_code=400 | |
| ) | |
| # Admin login success | |
| user = admin_key["user"] | |
| else: | |
| # For demo purposes, any username/password combo works | |
| # In production, you'd verify against a database | |
| # Create or update user | |
| user_id = f"user_{username.lower().replace(' ', '_')}" | |
| user = UserInfo( | |
| id=user_id, | |
| username=username, | |
| email=f"{username.lower().replace(' ', '.')}@example.com", | |
| last_login=datetime.now().isoformat() | |
| ) | |
| # Create session | |
| session_id = secrets.token_urlsafe(32) | |
| SESSIONS[session_id] = { | |
| "user": user, | |
| "expires": datetime.now() + timedelta(hours=24) | |
| } | |
| # Create redirect response | |
| response = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
| # Set session cookie | |
| response.set_cookie( | |
| key="session_id", | |
| value=session_id, | |
| max_age=86400, # 24 hours | |
| httponly=True, | |
| samesite="lax" | |
| ) | |
| return response | |
| async def logout( | |
| response: Response, | |
| session_id: Optional[str] = Cookie(None) | |
| ): | |
| """Process logout""" | |
| if session_id and session_id in SESSIONS: | |
| del SESSIONS[session_id] | |
| response.delete_cookie(key="session_id") | |
| return RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
| async def credits_page(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
| """Credits management page""" | |
| if not session_user: | |
| return RedirectResponse(url="/login") | |
| # Get credit history (mock data - replace with database in production) | |
| credit_history = [ | |
| { | |
| "date": "2024-03-20 14:30", | |
| "type": "Daily Reward", | |
| "amount": 5, | |
| "details": "Watched daily ad" | |
| }, | |
| { | |
| "date": "2024-03-19 15:45", | |
| "type": "Tool Usage", | |
| "amount": -5, | |
| "details": "Used Image Generation" | |
| } | |
| ] | |
| return templates.TemplateResponse( | |
| "credits.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "user": session_user, | |
| "user_credits": session_user.credits, | |
| "credit_packages": CREDIT_PACKAGES, | |
| "credit_history": credit_history, | |
| "referral_link": f"{request.base_url}?ref={session_user.id}" | |
| } | |
| ) | |
| async def get_ads_status(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
| """Get user's ad reward status""" | |
| if not session_user: | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"success": False, "error": "Authentication required"} | |
| ) | |
| # Get daily reward status | |
| daily_reward = DAILY_REWARDS.get(f"{session_user.id}_{datetime.now().date().isoformat()}") | |
| can_claim_daily = True | |
| cooldown_remaining = 0 | |
| if daily_reward and daily_reward.last_reward: | |
| last_reward_time = datetime.fromisoformat(daily_reward.last_reward) | |
| if (datetime.now() - last_reward_time).total_seconds() < 3600: # 1 hour cooldown | |
| can_claim_daily = False | |
| cooldown_remaining = 3600 - (datetime.now() - last_reward_time).total_seconds() | |
| # Get special reward status | |
| special_reward = SPECIAL_REWARDS.get(f"{session_user.id}_{datetime.now().date().isoformat()}") | |
| can_claim_special = True | |
| if special_reward and special_reward.claimed: | |
| can_claim_special = False | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "daily_rewards": { | |
| "count": daily_reward.count if daily_reward else 0, | |
| "can_claim": can_claim_daily and (not daily_reward or daily_reward.count < 3), | |
| "cooldown_remaining": int(cooldown_remaining) | |
| }, | |
| "special_reward": { | |
| "claimed": special_reward.claimed if special_reward else False, | |
| "can_claim": can_claim_special | |
| } | |
| } | |
| ) | |
| async def claim_ad_reward( | |
| request: Request, | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Process ad reward claim""" | |
| if not session_user: | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"success": False, "error": "Authentication required"} | |
| ) | |
| data = await request.json() | |
| reward_type = data.get("type") | |
| if reward_type not in ["daily", "special"]: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "Invalid reward type"} | |
| ) | |
| today = datetime.now().date().isoformat() | |
| if reward_type == "daily": | |
| # Get or create daily reward record | |
| daily_key = f"{session_user.id}_{today}" | |
| if daily_key not in DAILY_REWARDS: | |
| DAILY_REWARDS[daily_key] = DailyReward( | |
| user_id=session_user.id, | |
| date=today, | |
| count=0 | |
| ) | |
| daily_reward = DAILY_REWARDS[daily_key] | |
| # Check if user has already claimed maximum daily rewards | |
| if daily_reward.count >= 3: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "Daily reward limit reached"} | |
| ) | |
| # Check cooldown period | |
| if daily_reward.last_reward: | |
| last_reward_time = datetime.fromisoformat(daily_reward.last_reward) | |
| if (datetime.now() - last_reward_time).total_seconds() < 3600: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "Please wait 1 hour between rewards"} | |
| ) | |
| # Award credits | |
| reward_amount = 10 | |
| session_user.credits += reward_amount | |
| daily_reward.count += 1 | |
| daily_reward.last_reward = datetime.now().isoformat() | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "credits_earned": reward_amount, | |
| "current_credits": session_user.credits, | |
| "daily_progress": daily_reward.count | |
| } | |
| ) | |
| else: # special reward | |
| # Get or create special reward record | |
| special_key = f"{session_user.id}_{today}" | |
| if special_key not in SPECIAL_REWARDS: | |
| SPECIAL_REWARDS[special_key] = SpecialReward( | |
| user_id=session_user.id, | |
| date=today, | |
| claimed=False | |
| ) | |
| special_reward = SPECIAL_REWARDS[special_key] | |
| # Check if user has already claimed special reward | |
| if special_reward.claimed: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "Special reward already claimed"} | |
| ) | |
| # Award credits | |
| reward_amount = 25 | |
| session_user.credits += reward_amount | |
| special_reward.claimed = True | |
| special_reward.claimed_at = datetime.now().isoformat() | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "credits_earned": reward_amount, | |
| "current_credits": session_user.credits | |
| } | |
| ) | |
| async def create_admin( | |
| request: Request, | |
| response: Response, | |
| ): | |
| """Create an admin user (only for development)""" | |
| # Check if admin already exists | |
| admin_exists = False | |
| admin_user = None | |
| for session in SESSIONS.values(): | |
| if session.get("user") and session["user"].username == "admin": | |
| admin_exists = True | |
| admin_user = session["user"] | |
| break | |
| if admin_exists: | |
| # Update admin credits | |
| admin_user.credits = 10000 | |
| admin_user.role = "admin" | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "message": "Admin user updated with 10000 credits", | |
| "user_id": admin_user.id | |
| } | |
| ) | |
| # Create admin user | |
| user_id = "admin" | |
| user = UserInfo( | |
| id=user_id, | |
| username="admin", | |
| email="admin@example.com", | |
| credits=10000, | |
| role="admin", | |
| last_login=datetime.now().isoformat() | |
| ) | |
| # In production, you would hash the password | |
| # For simplicity, we'll store it in a way that the login function can use it | |
| # This is for development only, never do this in production! | |
| admin_info = { | |
| "user": user, | |
| "password": "admin123", # Plaintext for demo only | |
| "created_at": datetime.now().isoformat() | |
| } | |
| # Store admin info for login | |
| API_KEYS["admin_key"] = admin_info | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "message": "Admin user created with 10000 credits", | |
| "user_id": user_id, | |
| "username": "admin", | |
| "password": "admin123" | |
| } | |
| ) | |
| async def admin_dashboard( | |
| request: Request, | |
| user: UserInfo = Depends(lambda: require_session_user(admin_required=True)) | |
| ): | |
| """Admin dashboard""" | |
| # Get all users | |
| users = [] | |
| for session_id, session in SESSIONS.items(): | |
| if "user" in session: | |
| users.append({ | |
| "id": session["user"].id, | |
| "username": session["user"].username, | |
| "email": session["user"].email, | |
| "credits": session["user"].credits, | |
| "role": getattr(session["user"], "role", "user"), | |
| "session_id": session_id, | |
| "expires": session["expires"].isoformat() if "expires" in session else None | |
| }) | |
| # Sort users by credits (descending) | |
| users.sort(key=lambda x: x["credits"], reverse=True) | |
| return templates.TemplateResponse( | |
| "admin_dashboard.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "user": user, | |
| "user_credits": user.credits, | |
| "users": users, | |
| "total_users": len(users), | |
| "total_credits": sum(u["credits"] for u in users), | |
| "admin_count": sum(1 for u in users if u["role"] == "admin") | |
| } | |
| ) | |
| async def admin_update_credits( | |
| request: Request, | |
| user: UserInfo = Depends(lambda: require_session_user(admin_required=True)) | |
| ): | |
| """Admin endpoint to update user credits""" | |
| data = await request.json() | |
| user_id = data.get("user_id") | |
| new_credits = data.get("credits") | |
| if not user_id or new_credits is None: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "User ID and credits are required"} | |
| ) | |
| # Find the user | |
| target_user = None | |
| for session in SESSIONS.values(): | |
| if "user" in session and session["user"].id == user_id: | |
| target_user = session["user"] | |
| break | |
| if not target_user: | |
| return JSONResponse( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| content={"success": False, "error": "User not found"} | |
| ) | |
| # Update credits | |
| try: | |
| target_user.credits = float(new_credits) | |
| except ValueError: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content={"success": False, "error": "Invalid credit amount"} | |
| ) | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "user_id": user_id, | |
| "new_credits": target_user.credits, | |
| "message": f"Credits updated for user {target_user.username}" | |
| } | |
| ) | |
| async def process_request( | |
| request: Request, | |
| tool_id: str = Form(...), | |
| prompt: str = Form(...), | |
| provider: str = Form("openai"), | |
| model: str = Form("default"), | |
| session_user: Optional[UserInfo] = Depends(get_session_user) | |
| ): | |
| """Process a tool request from a form""" | |
| # Get the tool | |
| tool = next((t for t in TOOLS if t.id == tool_id), None) | |
| if not tool: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Tool Not Found", | |
| "error_description": "The requested tool does not exist.", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| # Confirm that user is logged in | |
| if not session_user: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Login Required", | |
| "error_description": "You must be logged in to use AI tools.", | |
| "user": None, | |
| "user_credits": 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| # Check if the user has enough credits | |
| if session_user.credits < tool.cost: | |
| return templates.TemplateResponse( | |
| "ad_reward.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "user": session_user, | |
| "user_credits": session_user.credits, | |
| "tool": tool, | |
| "reward_ad": ads_manager.get_ad_code("reward_video"), | |
| "sidebar_ad": ads_manager.get_ad_code("sidebar"), | |
| "return_url": f"/tool/{tool_id}", | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| # Get the provider | |
| try: | |
| provider_instance = get_provider(provider) | |
| if not provider_instance: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Provider Not Supported", | |
| "error_description": f"The provider '{provider}' is not supported.", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| result = None | |
| result_type = "text" # Default type | |
| # Determine result type based on tool ID | |
| if "image" in tool.id or "logo" in tool.id or "avatar" in tool.id: | |
| result_type = "image" | |
| elif "code" in tool.id or "debugging" in tool.id: | |
| result_type = "code" | |
| # Process the request based on tool type | |
| try: | |
| if result_type == "text": | |
| # Text generation tool | |
| result = provider_instance.generate_text( | |
| prompt=prompt, | |
| model=model if model != "default" else "mistralai/Mistral-7B-Instruct-v0.2", | |
| max_tokens=1000, | |
| temperature=0.7 | |
| ) | |
| elif result_type == "image": | |
| # Image generation tool | |
| try: | |
| result = provider_instance.generate_image( | |
| prompt=prompt, | |
| model=model if model != "default" else "stabilityai/stable-diffusion-xl-base-1.0" | |
| ) | |
| except AttributeError: | |
| # Provider doesn't support image generation | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Provider Not Supported", | |
| "error_description": f"The provider '{provider}' does not support image generation.", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| elif result_type == "code": | |
| # Code generation tool | |
| result = provider_instance.generate_text( | |
| prompt=prompt, | |
| model=model if model != "default" else "gpt2", | |
| max_tokens=1500, | |
| temperature=0.5 | |
| ) | |
| else: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Tool Type Not Supported", | |
| "error_description": f"The tool type '{result_type}' is not supported.", | |
| "user": session_user, | |
| "user_credits": session_user.credits if session_user else 0, | |
| "tools": TOOLS # Include for consistent sidebar navigation | |
| } | |
| ) | |
| # Deduct credits | |
| session_user.credits -= tool.cost | |
| # Return the result | |
| return templates.TemplateResponse( | |
| "result.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "user": session_user, | |
| "user_credits": session_user.credits, | |
| "tool": tool, | |
| "prompt": prompt, | |
| "result": result, | |
| "result_type": result_type, | |
| "tools": TOOLS, # Include all tools for sidebar navigation | |
| "json_data": result.get("raw_response", {}) if isinstance(result, dict) else {} | |
| } | |
| ) | |
| except Exception as generate_error: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Generation Error", | |
| "error_description": f"Error generating content: {str(generate_error)}", | |
| "user": session_user, | |
| "user_credits": session_user.credits, | |
| "tools": TOOLS # Include all tools for sidebar navigation | |
| } | |
| ) | |
| except Exception as provider_error: | |
| return templates.TemplateResponse( | |
| "error.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error_title": "Provider Error", | |
| "error_description": f"Error with provider: {str(provider_error)}", | |
| "user": session_user, | |
| "user_credits": session_user.credits, | |
| "tools": TOOLS # Include all tools for sidebar navigation | |
| } | |
| ) | |
| async def register( | |
| request: Request, | |
| response: Response, | |
| username: str = Form(...), | |
| email: str = Form(...), | |
| password: str = Form(...), | |
| session_id: Optional[str] = Cookie(None) | |
| ): | |
| """Process registration""" | |
| # Check if username or email already exists (in production use a database) | |
| for session in SESSIONS.values(): | |
| if session.get("user"): | |
| if session["user"].username.lower() == username.lower(): | |
| return templates.TemplateResponse( | |
| "register.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error": "Username already exists", | |
| "form_data": {"username": username, "email": email} | |
| }, | |
| status_code=400 | |
| ) | |
| if session["user"].email.lower() == email.lower(): | |
| return templates.TemplateResponse( | |
| "register.html", | |
| { | |
| "request": request, | |
| "app_name": "AI Tool Hub", | |
| "error": "Email already exists", | |
| "form_data": {"username": username, "email": email} | |
| }, | |
| status_code=400 | |
| ) | |
| # Create user | |
| user_id = f"user_{username.lower().replace(' ', '_')}" | |
| # Check if there's an existing temporary session to convert | |
| is_converting_temp = False | |
| temp_credits = 0 | |
| if session_id and session_id in SESSIONS: | |
| session = SESSIONS[session_id] | |
| if session.get("is_temporary", False): | |
| # Convert temporary session to permanent | |
| is_converting_temp = True | |
| temp_credits = session["user"].credits | |
| # Delete the temporary session | |
| del SESSIONS[session_id] | |
| # Create new user with starting credits (more if converting from temp) | |
| start_credits = 10.0 + temp_credits | |
| user = UserInfo( | |
| id=user_id, | |
| username=username, | |
| email=email, | |
| credits=start_credits, | |
| last_login=datetime.now().isoformat() | |
| ) | |
| # In production, you would hash the password and store in database | |
| # Create new session | |
| new_session_id = secrets.token_urlsafe(32) | |
| SESSIONS[new_session_id] = { | |
| "user": user, | |
| "expires": datetime.now() + timedelta(hours=24) | |
| } | |
| # Set session cookie | |
| response = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
| response.set_cookie( | |
| key="session_id", | |
| value=new_session_id, | |
| max_age=86400, # 24 hoursgj | |
| httponly=True, | |
| samesite="lax" | |
| ) | |
| return response | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8006) |