from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from pydantic import HttpUrl, EmailStr from scraper import scrape_page from summarizer import quick_summarize from rich_card_builder import build_rich_card from send_email import send_rcs_email import asyncio from urllib.parse import urlparse import logging import uuid import json import http.client from dotenv import load_dotenv import os import google.generativeai as genai from typing import Optional, List, Dict load_dotenv() # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') app = FastAPI(title="Website Scraper API (Enhanced for RCS)") # Mount static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") # In-memory session storage sessions = {} async def generate_dynamic_buttons(title: str, description: str, url: str, next_interaction: str = None) -> List[Dict]: """Generate dynamic quick reply buttons for the next interaction using Gemini-1.5 pro.""" try: # Validate inputs with defaults title = title.strip() if title and title.strip() else "News Summary" description = description.strip() if description and description.strip() else "Explore news and insights." url = url.strip() if url and url.strip() else "https://example.com" logging.info(f"Generating buttons for: title={title}, description={description[:30]}..., url={url}") # Get Gemini API key api_key = os.getenv("GEMINI_API_KEY") if not api_key: logging.error("Gemini API key not found. Please set GEMINI_API_KEY in .env file.") return [ { "type": "postback", "title": "View Details", "payload": f"goto_{next_interaction}", "execute": next_interaction } ] if next_interaction else [] # Configure Gemini client genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-1.5-pro') # Combine inputs input_text = f"Title: {title}\nDescription: {description}\nURL: {url}" input_text = input_text[:500] # Truncate to avoid exceeding limits # Optimized prompt for dynamic, contextually relevant buttons prompt = ( f"Based on the following content, generate up to two concise (3-8 words) quick reply button titles that are action-oriented, engaging, and relevant to the content. Avoid generic terms like 'Show Next' or 'Explore More'. Return the titles as a JSON array of strings.\n\n" f"Content:\n{input_text}\n\n" f"Example output: [\"Discover New Styles\", \"Shop Now Online\"]\n" f"Return only the JSON array, no markdown or extra text." ) # Retry mechanism for API calls max_retries = 3 for attempt in range(max_retries): try: response = await model.generate_content_async(prompt) raw_content = response.text.strip() logging.info(f"Gemini response: {raw_content}") # Remove markdown code block markers if present raw_content = raw_content.strip('```json').strip('```').strip() # Parse response button_titles = json.loads(raw_content) if not isinstance(button_titles, list) or not all(isinstance(t, str) for t in button_titles): logging.warning(f"Invalid Gemini response format: {raw_content}") raise ValueError("Response is not a list of strings") # Filter valid button titles valid_buttons = [t.strip() for t in button_titles if t.strip() and 3 <= len(t.strip().split()) <= 8] if not valid_buttons: logging.warning("No valid button titles in response") raise ValueError("No valid button titles") # Create quick replies quick_replies = [ { "type": "postback", "title": title, "payload": f"goto_{next_interaction}_{i}", "execute": next_interaction } for i, title in enumerate(valid_buttons[:2]) ] logging.info(f"Generated quick replies: {quick_replies}") return quick_replies except Exception as e: logging.warning(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: await asyncio.sleep(1) # Wait before retrying continue # Fallback if all retries fail logging.error("All retries failed for button generation") return [ { "type": "postback", "title": "View Details", "payload": f"goto_{next_interaction}", "execute": next_interaction } ] if next_interaction else [] except Exception as e: logging.error(f"Error generating dynamic buttons: {str(e)}") return [ { "type": "postback", "title": "View Details", "payload": f"goto_{next_interaction}", "execute": next_interaction } ] if next_interaction else [] async def create_nativemsg_bot(rich_cards: List[Dict], url: str, bot_name: str, api_token: str) -> Dict: """Create a bot on NativeMSG with connected interactions based on rich cards.""" try: # Validate API token if not api_token: logging.error("NativeMSG API token not provided and not found in .env file.") raise ValueError("NativeMSG API token is required.") # Use provided bot name or default to dynamic name final_bot_name = bot_name or f"Bot for {urlparse(url).netloc}" # Prepare bot payload interactions = [] for idx, card in enumerate(rich_cards, 1): # Build interaction using the original rich card structure message = { "text": f"{card['title']}\n\n{card['text']}", "mediaType": "image", "media": card.get("media", "") or "https://example.com/placeholder.jpg", "richCard": { "cardOrientation": "VERTICAL", "mediaHeight": "MEDIUM" }, "buttons": card.get("buttons", []), "quickReplies": card.get("quickReplies", []) } # Build interaction interaction = { "name": f"Interaction #{idx}", "intents": ["show_content", f"content_{idx}"], "actions": [ { "send": { "message": message }, "type": "RichCard", "name": f"Send Rich Card #{idx}" } ] } interactions.append(interaction) # Add welcome interaction welcome_message = { "text": f"Welcome to the {urlparse(url).netloc} RCS Bot! Explore the latest content.", "richCard": { "cardOrientation": "VERTICAL", "mediaHeight": "MEDIUM" }, "quickReplies": [ { "type": "postback", "title": "Start Exploring", "payload": "start_exploring", "execute": "Interaction #1" } ] } welcome_interaction = { "name": "Welcome Interaction", "intents": ["start", "welcome"], "actions": [ { "send": { "message": welcome_message }, "type": "RichCard", "name": "Send Welcome Message" } ] } interactions.insert(0, welcome_interaction) payload = { "name": final_bot_name, "interactions": interactions } # Log the payload for debugging logging.info(f"NativeMSG bot payload: {json.dumps(payload, indent=2)}") # Send request to NativeMSG API connection = http.client.HTTPSConnection("api.nativemsg.com") headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" } connection.request("POST", "/v1/bots", json.dumps(payload), headers) response = connection.getresponse() response_data = response.read().decode('utf-8') logging.info(f"NativeMSG bot creation response: Status {response.status}, Data: {response_data}") if response.status != 200: logging.error(f"Failed to create bot: {response_data}") raise HTTPException(status_code=500, detail=f"Failed to create bot: {response_data}") return json.loads(response_data) except Exception as e: logging.error(f"Error creating NativeMSG bot: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create bot: {str(e)}") @app.get("/scrape") async def crawl_website( url: HttpUrl, email: EmailStr, bot_name: Optional[str] = None, nativemsg_token: Optional[str] = None ): """Crawl a website, generate rich cards with dynamic buttons, create a NativeMSG bot, and send an email with a link to view the RCS.""" try: # Determine API token api_token = nativemsg_token or os.getenv("NATIVEMSG_API_TOKEN") # Scrape the website visited = set() to_visit = {str(url)} base_domain = urlparse(str(url)).netloc results = [] while to_visit and len(visited) < 20: # Limited to 20 for demo current_url = to_visit.pop() if current_url in visited: continue visited.add(current_url) logging.info(f"Scraping page: {current_url}") page_data, new_links = await scrape_page(current_url, visited, base_domain) if page_data: logging.info(f"Scraped data: {page_data}") summary = await quick_summarize(page_data["text"], page_data["url"]) rich_card = build_rich_card(page_data, summary) rich_card["title"] = summary.get("title", "News Summary") rich_card["url"] = page_data.get("url", str(url)) results.append(rich_card) to_visit.update(new_links) await asyncio.sleep(0.5) if not results: logging.error("No rich cards generated from scraping.") raise HTTPException(status_code=400, detail="No content scraped from the provided URL.") # Generate dynamic quick replies for each rich card for idx, card in enumerate(results): next_interaction = f"Interaction #{idx + 2}" if idx < len(results) - 1 else None next_card = results[idx + 1] if idx < len(results) - 1 else None dynamic_quick_replies = [] if next_card: dynamic_quick_replies = await generate_dynamic_buttons( title=next_card.get("title", "News Summary"), description=next_card.get("text", "Explore news and insights."), url=next_card.get("url", ""), next_interaction=next_interaction ) else: # Fallback for the last card to ensure it has a button dynamic_quick_replies = await generate_dynamic_buttons( title=card.get("title", "News Summary"), description=card.get("text", "Explore news and insights."), url=card.get("url", ""), next_interaction=None ) # Update the rich card's quickReplies card["quickReplies"] = dynamic_quick_replies + [ { "type": "call", "title": "Contact Support", "payload": "+12345678901" } ] # Create NativeMSG bot with the rich cards bot_response = await create_nativemsg_bot(results, str(url), bot_name, api_token) # Store the results session_id = str(uuid.uuid4()) sessions[session_id] = { "rich_cards": results, "bot_response": bot_response } logging.info(f"Session created with ID: {session_id}, Session data: {sessions[session_id]}") # Generate the direct link to view the RCS direct_link = f"https://aideveloper1-rcs.hf.space/view-rcs/{session_id}" logging.info(f"Generated direct link: {direct_link}") # Send email with the direct link await send_rcs_email(email, direct_link) logging.info(f"Final response: {results}, Bot: {bot_response}, Email sent to: {email}, Session ID: {session_id}") return {"rich_cards": results, "bot_response": bot_response, "view_link": direct_link} except Exception as e: logging.error(f"Scraping or bot creation failed: {str(e)}") raise HTTPException(status_code=500, detail=f"Error: {str(e)}") @app.get("/view-rcs/{session_id}", response_class=HTMLResponse) async def view_rcs(session_id: str, request: Request): """Serve the RCS cards for a specific session ID.""" logging.info(f"Attempting to access session with ID: {session_id}") logging.info(f"Current sessions: {list(sessions.keys())}") if session_id not in sessions: logging.error(f"Session ID {session_id} not found in sessions.") raise HTTPException(status_code=404, detail="Session not found.") rich_cards = sessions[session_id]["rich_cards"] logging.info(f"Retrieved session data for ID {session_id}: {rich_cards}") return templates.TemplateResponse("rcs_view.html", {"request": request, "rich_cards": rich_cards}) @app.get("/", response_class=HTMLResponse) async def serve_home(request: Request): """Serve the frontend HTML page.""" return templates.TemplateResponse("index.html", {"request": request}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)