ChatbotRAG / lead_storage_service.py
minhvtt's picture
Upload 26 files
75033ed verified
raw
history blame
2.8 kB
"""
Lead Storage Service
Saves customer leads collected during scenario conversations
"""
from typing import Dict, Optional
from datetime import datetime
from pymongo.collection import Collection
class LeadStorageService:
"""
Store customer leads from scenario interactions
"""
def __init__(self, leads_collection: Collection):
self.collection = leads_collection
self._ensure_indexes()
def _ensure_indexes(self):
"""Create indexes for leads collection"""
try:
self.collection.create_index("email")
self.collection.create_index("phone")
self.collection.create_index("created_at")
print("✓ Leads indexes created")
except Exception as e:
print(f"Leads indexes already exist: {e}")
def save_lead(
self,
event_name: str,
email: Optional[str] = None,
phone: Optional[str] = None,
interests: Optional[Dict] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None
) -> str:
"""
Save customer lead
Args:
event_name: Event they're interested in
email: Customer email
phone: Customer phone
interests: Additional data (group_size, etc.)
session_id: Conversation session
user_id: User ID if authenticated
Returns:
Lead ID
"""
lead = {
"event_name": event_name,
"email": email,
"phone": phone,
"interests": interests or {},
"session_id": session_id,
"user_id": user_id,
"source": "chatbot_scenario",
"created_at": datetime.utcnow(),
"status": "new"
}
result = self.collection.insert_one(lead)
lead_id = str(result.inserted_id)
print(f"💾 Saved lead: {lead_id} | Event: {event_name} | Email: {email} | Phone: {phone}")
return lead_id
def get_leads(
self,
event_name: Optional[str] = None,
limit: int = 50,
skip: int = 0
):
"""Get leads with optional filtering"""
query = {}
if event_name:
query["event_name"] = event_name
leads = self.collection.find(query).sort("created_at", -1).skip(skip).limit(limit)
return list(leads)
def count_leads(self, event_name: Optional[str] = None) -> int:
"""Count total leads"""
query = {}
if event_name:
query["event_name"] = event_name
return self.collection.count_documents(query)