Spaces:
Running
Running
| """ | |
| Lead Storage Service | |
| Saves customer leads collected during scenario conversations | |
| """ | |
| from typing import Dict, Optional | |
| from datetime import datetime | |
| from pymongo.collection import Collection | |
| class LeadStorageService: | |
| """ | |
| Store customer leads from scenario interactions | |
| """ | |
| def __init__(self, leads_collection: Collection): | |
| self.collection = leads_collection | |
| self._ensure_indexes() | |
| def _ensure_indexes(self): | |
| """Create indexes for leads collection""" | |
| try: | |
| self.collection.create_index("email") | |
| self.collection.create_index("phone") | |
| self.collection.create_index("created_at") | |
| print("✓ Leads indexes created") | |
| except Exception as e: | |
| print(f"Leads indexes already exist: {e}") | |
| def save_lead( | |
| self, | |
| event_name: str, | |
| email: Optional[str] = None, | |
| phone: Optional[str] = None, | |
| interests: Optional[Dict] = None, | |
| session_id: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Save customer lead | |
| Args: | |
| event_name: Event they're interested in | |
| email: Customer email | |
| phone: Customer phone | |
| interests: Additional data (group_size, etc.) | |
| session_id: Conversation session | |
| user_id: User ID if authenticated | |
| Returns: | |
| Lead ID | |
| """ | |
| lead = { | |
| "event_name": event_name, | |
| "email": email, | |
| "phone": phone, | |
| "interests": interests or {}, | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| "source": "chatbot_scenario", | |
| "created_at": datetime.utcnow(), | |
| "status": "new" | |
| } | |
| result = self.collection.insert_one(lead) | |
| lead_id = str(result.inserted_id) | |
| print(f"💾 Saved lead: {lead_id} | Event: {event_name} | Email: {email} | Phone: {phone}") | |
| return lead_id | |
| def get_leads( | |
| self, | |
| event_name: Optional[str] = None, | |
| limit: int = 50, | |
| skip: int = 0 | |
| ): | |
| """Get leads with optional filtering""" | |
| query = {} | |
| if event_name: | |
| query["event_name"] = event_name | |
| leads = self.collection.find(query).sort("created_at", -1).skip(skip).limit(limit) | |
| return list(leads) | |
| def count_leads(self, event_name: Optional[str] = None) -> int: | |
| """Count total leads""" | |
| query = {} | |
| if event_name: | |
| query["event_name"] = event_name | |
| return self.collection.count_documents(query) | |