Spaces:
Running
Running
File size: 2,797 Bytes
75033ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
"""
Lead Storage Service
Saves customer leads collected during scenario conversations
"""
from typing import Dict, Optional
from datetime import datetime
from pymongo.collection import Collection
class LeadStorageService:
"""
Store customer leads from scenario interactions
"""
def __init__(self, leads_collection: Collection):
self.collection = leads_collection
self._ensure_indexes()
def _ensure_indexes(self):
"""Create indexes for leads collection"""
try:
self.collection.create_index("email")
self.collection.create_index("phone")
self.collection.create_index("created_at")
print("✓ Leads indexes created")
except Exception as e:
print(f"Leads indexes already exist: {e}")
def save_lead(
self,
event_name: str,
email: Optional[str] = None,
phone: Optional[str] = None,
interests: Optional[Dict] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None
) -> str:
"""
Save customer lead
Args:
event_name: Event they're interested in
email: Customer email
phone: Customer phone
interests: Additional data (group_size, etc.)
session_id: Conversation session
user_id: User ID if authenticated
Returns:
Lead ID
"""
lead = {
"event_name": event_name,
"email": email,
"phone": phone,
"interests": interests or {},
"session_id": session_id,
"user_id": user_id,
"source": "chatbot_scenario",
"created_at": datetime.utcnow(),
"status": "new"
}
result = self.collection.insert_one(lead)
lead_id = str(result.inserted_id)
print(f"💾 Saved lead: {lead_id} | Event: {event_name} | Email: {email} | Phone: {phone}")
return lead_id
def get_leads(
self,
event_name: Optional[str] = None,
limit: int = 50,
skip: int = 0
):
"""Get leads with optional filtering"""
query = {}
if event_name:
query["event_name"] = event_name
leads = self.collection.find(query).sort("created_at", -1).skip(skip).limit(limit)
return list(leads)
def count_leads(self, event_name: Optional[str] = None) -> int:
"""Count total leads"""
query = {}
if event_name:
query["event_name"] = event_name
return self.collection.count_documents(query)
|