Spaces:
Paused
Paused
File size: 5,096 Bytes
4efde5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
#!/usr/bin/env python3
"""
Suna Agent Installation Script for Individual Users
Simple script to install Suna agents for users by email address.
Usage:
# Install Suna for a user
python install_suna_for_user.py user@example.com
# Install with replacement (if agent already exists)
python install_suna_for_user.py user@example.com --replace
Examples:
python install_suna_for_user.py john.doe@company.com
python install_suna_for_user.py admin@example.org --replace
"""
import asyncio
import argparse
import sys
from pathlib import Path
from typing import Optional, Dict, Any
backend_dir = Path(__file__).parent.parent.parent
sys.path.insert(0, str(backend_dir))
from utils.suna_default_agent_service import SunaDefaultAgentService
from services.supabase import DBConnection
from utils.logger import logger
class SunaUserInstaller:
def __init__(self):
self.db = DBConnection()
self.service = SunaDefaultAgentService(self.db)
async def initialize(self):
await self.db.initialize()
async def get_account_by_email(self, email: str) -> Optional[Dict[str, Any]]:
try:
client = await self.db.client
try:
result = await client.rpc('get_user_account_by_email', {
'email_input': email.lower()
}).execute()
if result.data:
return result.data
except Exception as e:
logger.warning(f"RPC function not available: {e}")
logger.info("Please run migration: 20250121_get_user_account_by_email.sql")
logger.info("Falling back to name matching...")
result = await client.schema('basejump').table('accounts').select(
'id',
'name',
'slug',
'primary_owner_user_id'
).eq('personal_account', True).execute()
if not result.data:
return None
email_prefix = email.split('@')[0].lower()
for account in result.data:
account_name = account.get('name') or ''
account_slug = account.get('slug') or ''
if account_name.lower() == email_prefix or \
account_slug.lower() == email_prefix:
return account
email_prefix_normalized = email_prefix.replace('.', '').replace('-', '').replace('_', '')
for account in result.data:
account_name = (account.get('name') or '').lower().replace('.', '').replace('-', '').replace('_', '')
account_slug = (account.get('slug') or '').lower().replace('.', '').replace('-', '').replace('_', '')
if account_name == email_prefix_normalized or account_slug == email_prefix_normalized:
return account
return None
except Exception as e:
logger.error(f"Error looking up account by email: {e}")
raise
async def install_for_email(self, email: str, replace: bool = False):
print(f"π Looking for account with email: {email}")
account = await self.get_account_by_email(email)
if not account:
print(f"β No account found for email: {email}")
print(f" Make sure the user has signed up and has a personal account")
return
account_id = account['id']
print(f"β
Found account: {account['name']} ({account_id})")
print(f"π Installing Suna agent...")
agent_id = await self.service.install_suna_agent_for_user(
account_id,
replace_existing=replace
)
if agent_id:
print(f"β
Successfully installed Suna agent!")
print(f" π€ Agent ID: {agent_id}")
print(f" π€ User: {email}")
print(f" π¦ Account: {account_id}")
else:
print(f"β Failed to install Suna agent for {email}")
async def main():
parser = argparse.ArgumentParser(
description="Install Suna agent for a user by email",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument('email', help='Email address of the user')
parser.add_argument('--replace', action='store_true',
help='Replace existing Suna agent if present')
args = parser.parse_args()
installer = SunaUserInstaller()
try:
await installer.initialize()
await installer.install_for_email(args.email, args.replace)
except KeyboardInterrupt:
print("\nβ οΈ Operation cancelled by user")
except Exception as e:
print(f"β Error: {str(e)}")
logger.error(f"Script error: {str(e)}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main()) |