Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import HttpUrl, EmailStr | |
| from scraper import scrape_page | |
| from summarizer import quick_summarize | |
| from rich_card_builder import build_rich_card | |
| from send_email import send_rcs_email | |
| import asyncio | |
| from urllib.parse import urlparse | |
| import logging | |
| import uuid | |
| import json | |
| import http.client | |
| from dotenv import load_dotenv | |
| import os | |
| import google.generativeai as genai | |
| from typing import Optional, List, Dict | |
| load_dotenv() | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| app = FastAPI(title="Website Scraper API (Enhanced for RCS)") | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up Jinja2 templates | |
| templates = Jinja2Templates(directory="templates") | |
| # In-memory session storage | |
| sessions = {} | |
| async def generate_dynamic_buttons(title: str, description: str, url: str, next_interaction: str = None) -> List[Dict]: | |
| """Generate dynamic quick reply buttons for the next interaction using Gemini-1.5 pro.""" | |
| try: | |
| # Validate inputs with defaults | |
| title = title.strip() if title and title.strip() else "News Summary" | |
| description = description.strip() if description and description.strip() else "Explore news and insights." | |
| url = url.strip() if url and url.strip() else "https://example.com" | |
| logging.info(f"Generating buttons for: title={title}, description={description[:30]}..., url={url}") | |
| # Get Gemini API key | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| logging.error("Gemini API key not found. Please set GEMINI_API_KEY in .env file.") | |
| return [ | |
| { | |
| "type": "postback", | |
| "title": "View Details", | |
| "payload": f"goto_{next_interaction}", | |
| "execute": next_interaction | |
| } | |
| ] if next_interaction else [] | |
| # Configure Gemini client | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel('gemini-1.5-pro') | |
| # Combine inputs | |
| input_text = f"Title: {title}\nDescription: {description}\nURL: {url}" | |
| input_text = input_text[:500] # Truncate to avoid exceeding limits | |
| # Optimized prompt for dynamic, contextually relevant buttons | |
| prompt = ( | |
| f"Based on the following content, generate up to two concise (3-8 words) quick reply button titles that are action-oriented, engaging, and relevant to the content. Avoid generic terms like 'Show Next' or 'Explore More'. Return the titles as a JSON array of strings.\n\n" | |
| f"Content:\n{input_text}\n\n" | |
| f"Example output: [\"Discover New Styles\", \"Shop Now Online\"]\n" | |
| f"Return only the JSON array, no markdown or extra text." | |
| ) | |
| # Retry mechanism for API calls | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| response = await model.generate_content_async(prompt) | |
| raw_content = response.text.strip() | |
| logging.info(f"Gemini response: {raw_content}") | |
| # Remove markdown code block markers if present | |
| raw_content = raw_content.strip('```json').strip('```').strip() | |
| # Parse response | |
| button_titles = json.loads(raw_content) | |
| if not isinstance(button_titles, list) or not all(isinstance(t, str) for t in button_titles): | |
| logging.warning(f"Invalid Gemini response format: {raw_content}") | |
| raise ValueError("Response is not a list of strings") | |
| # Filter valid button titles | |
| valid_buttons = [t.strip() for t in button_titles if t.strip() and 3 <= len(t.strip().split()) <= 8] | |
| if not valid_buttons: | |
| logging.warning("No valid button titles in response") | |
| raise ValueError("No valid button titles") | |
| # Create quick replies | |
| quick_replies = [ | |
| { | |
| "type": "postback", | |
| "title": title, | |
| "payload": f"goto_{next_interaction}_{i}", | |
| "execute": next_interaction | |
| } | |
| for i, title in enumerate(valid_buttons[:2]) | |
| ] | |
| logging.info(f"Generated quick replies: {quick_replies}") | |
| return quick_replies | |
| except Exception as e: | |
| logging.warning(f"Attempt {attempt + 1} failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(1) # Wait before retrying | |
| continue | |
| # Fallback if all retries fail | |
| logging.error("All retries failed for button generation") | |
| return [ | |
| { | |
| "type": "postback", | |
| "title": "View Details", | |
| "payload": f"goto_{next_interaction}", | |
| "execute": next_interaction | |
| } | |
| ] if next_interaction else [] | |
| except Exception as e: | |
| logging.error(f"Error generating dynamic buttons: {str(e)}") | |
| return [ | |
| { | |
| "type": "postback", | |
| "title": "View Details", | |
| "payload": f"goto_{next_interaction}", | |
| "execute": next_interaction | |
| } | |
| ] if next_interaction else [] | |
| async def create_nativemsg_bot(rich_cards: List[Dict], url: str, bot_name: str, api_token: str) -> Dict: | |
| """Create a bot on NativeMSG with connected interactions based on rich cards.""" | |
| try: | |
| # Validate API token | |
| if not api_token: | |
| logging.error("NativeMSG API token not provided and not found in .env file.") | |
| raise ValueError("NativeMSG API token is required.") | |
| # Use provided bot name or default to dynamic name | |
| final_bot_name = bot_name or f"Bot for {urlparse(url).netloc}" | |
| # Prepare bot payload | |
| interactions = [] | |
| for idx, card in enumerate(rich_cards, 1): | |
| # Build interaction using the original rich card structure | |
| message = { | |
| "text": f"{card['title']}\n\n{card['text']}", | |
| "mediaType": "image", | |
| "media": card.get("media", "") or "https://example.com/placeholder.jpg", | |
| "richCard": { | |
| "cardOrientation": "VERTICAL", | |
| "mediaHeight": "MEDIUM" | |
| }, | |
| "buttons": card.get("buttons", []), | |
| "quickReplies": card.get("quickReplies", []) | |
| } | |
| # Build interaction | |
| interaction = { | |
| "name": f"Interaction #{idx}", | |
| "intents": ["show_content", f"content_{idx}"], | |
| "actions": [ | |
| { | |
| "send": { | |
| "message": message | |
| }, | |
| "type": "RichCard", | |
| "name": f"Send Rich Card #{idx}" | |
| } | |
| ] | |
| } | |
| interactions.append(interaction) | |
| # Add welcome interaction | |
| welcome_message = { | |
| "text": f"Welcome to the {urlparse(url).netloc} RCS Bot! Explore the latest content.", | |
| "richCard": { | |
| "cardOrientation": "VERTICAL", | |
| "mediaHeight": "MEDIUM" | |
| }, | |
| "quickReplies": [ | |
| { | |
| "type": "postback", | |
| "title": "Start Exploring", | |
| "payload": "start_exploring", | |
| "execute": "Interaction #1" | |
| } | |
| ] | |
| } | |
| welcome_interaction = { | |
| "name": "Welcome Interaction", | |
| "intents": ["start", "welcome"], | |
| "actions": [ | |
| { | |
| "send": { | |
| "message": welcome_message | |
| }, | |
| "type": "RichCard", | |
| "name": "Send Welcome Message" | |
| } | |
| ] | |
| } | |
| interactions.insert(0, welcome_interaction) | |
| payload = { | |
| "name": final_bot_name, | |
| "interactions": interactions | |
| } | |
| # Log the payload for debugging | |
| logging.info(f"NativeMSG bot payload: {json.dumps(payload, indent=2)}") | |
| # Send request to NativeMSG API | |
| connection = http.client.HTTPSConnection("api.nativemsg.com") | |
| headers = { | |
| "Authorization": f"Bearer {api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| connection.request("POST", "/v1/bots", json.dumps(payload), headers) | |
| response = connection.getresponse() | |
| response_data = response.read().decode('utf-8') | |
| logging.info(f"NativeMSG bot creation response: Status {response.status}, Data: {response_data}") | |
| if response.status != 200: | |
| logging.error(f"Failed to create bot: {response_data}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create bot: {response_data}") | |
| return json.loads(response_data) | |
| except Exception as e: | |
| logging.error(f"Error creating NativeMSG bot: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create bot: {str(e)}") | |
| async def crawl_website( | |
| url: HttpUrl, | |
| email: EmailStr, | |
| bot_name: Optional[str] = None, | |
| nativemsg_token: Optional[str] = None | |
| ): | |
| """Crawl a website, generate rich cards with dynamic buttons, create a NativeMSG bot, and send an email with a link to view the RCS.""" | |
| try: | |
| # Determine API token | |
| api_token = nativemsg_token or os.getenv("NATIVEMSG_API_TOKEN") | |
| # Scrape the website | |
| visited = set() | |
| to_visit = {str(url)} | |
| base_domain = urlparse(str(url)).netloc | |
| results = [] | |
| while to_visit and len(visited) < 20: # Limited to 20 for demo | |
| current_url = to_visit.pop() | |
| if current_url in visited: | |
| continue | |
| visited.add(current_url) | |
| logging.info(f"Scraping page: {current_url}") | |
| page_data, new_links = await scrape_page(current_url, visited, base_domain) | |
| if page_data: | |
| logging.info(f"Scraped data: {page_data}") | |
| summary = await quick_summarize(page_data["text"], page_data["url"]) | |
| rich_card = build_rich_card(page_data, summary) | |
| rich_card["title"] = summary.get("title", "News Summary") | |
| rich_card["url"] = page_data.get("url", str(url)) | |
| results.append(rich_card) | |
| to_visit.update(new_links) | |
| await asyncio.sleep(0.5) | |
| if not results: | |
| logging.error("No rich cards generated from scraping.") | |
| raise HTTPException(status_code=400, detail="No content scraped from the provided URL.") | |
| # Generate dynamic quick replies for each rich card | |
| for idx, card in enumerate(results): | |
| next_interaction = f"Interaction #{idx + 2}" if idx < len(results) - 1 else None | |
| next_card = results[idx + 1] if idx < len(results) - 1 else None | |
| dynamic_quick_replies = [] | |
| if next_card: | |
| dynamic_quick_replies = await generate_dynamic_buttons( | |
| title=next_card.get("title", "News Summary"), | |
| description=next_card.get("text", "Explore news and insights."), | |
| url=next_card.get("url", ""), | |
| next_interaction=next_interaction | |
| ) | |
| else: | |
| # Fallback for the last card to ensure it has a button | |
| dynamic_quick_replies = await generate_dynamic_buttons( | |
| title=card.get("title", "News Summary"), | |
| description=card.get("text", "Explore news and insights."), | |
| url=card.get("url", ""), | |
| next_interaction=None | |
| ) | |
| # Update the rich card's quickReplies | |
| card["quickReplies"] = dynamic_quick_replies + [ | |
| { | |
| "type": "call", | |
| "title": "Contact Support", | |
| "payload": "+12345678901" | |
| } | |
| ] | |
| # Create NativeMSG bot with the rich cards | |
| bot_response = await create_nativemsg_bot(results, str(url), bot_name, api_token) | |
| # Store the results | |
| session_id = str(uuid.uuid4()) | |
| sessions[session_id] = { | |
| "rich_cards": results, | |
| "bot_response": bot_response | |
| } | |
| logging.info(f"Session created with ID: {session_id}, Session data: {sessions[session_id]}") | |
| # Generate the direct link to view the RCS | |
| direct_link = f"https://aideveloper1-rcs.hf.space/view-rcs/{session_id}" | |
| logging.info(f"Generated direct link: {direct_link}") | |
| # Send email with the direct link | |
| await send_rcs_email(email, direct_link) | |
| logging.info(f"Final response: {results}, Bot: {bot_response}, Email sent to: {email}, Session ID: {session_id}") | |
| return {"rich_cards": results, "bot_response": bot_response, "view_link": direct_link} | |
| except Exception as e: | |
| logging.error(f"Scraping or bot creation failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") | |
| async def view_rcs(session_id: str, request: Request): | |
| """Serve the RCS cards for a specific session ID.""" | |
| logging.info(f"Attempting to access session with ID: {session_id}") | |
| logging.info(f"Current sessions: {list(sessions.keys())}") | |
| if session_id not in sessions: | |
| logging.error(f"Session ID {session_id} not found in sessions.") | |
| raise HTTPException(status_code=404, detail="Session not found.") | |
| rich_cards = sessions[session_id]["rich_cards"] | |
| logging.info(f"Retrieved session data for ID {session_id}: {rich_cards}") | |
| return templates.TemplateResponse("rcs_view.html", {"request": request, "rich_cards": rich_cards}) | |
| async def serve_home(request: Request): | |
| """Serve the frontend HTML page.""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8001) |