File size: 4,997 Bytes
f2bab5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""

CouchDB client for distributed coordination.

"""
import couchdb
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any
from .config import settings

class CouchDBClient:
    """Client for interacting with CouchDB for distributed coordination."""
    
    def __init__(self):
        self.server = couchdb.Server(settings.COUCHDB_URL)
        self.server.resource.credentials = (
            settings.COUCHDB_USER,
            settings.COUCHDB_PASSWORD
        )
        self._ensure_databases()
    
    def _ensure_databases(self):
        """Ensure required databases exist."""
        required_dbs = ['agents', 'jobs', 'gradients', 'model_state']
        for db_name in required_dbs:
            if db_name not in self.server:
                self.server.create(db_name)
    
    def register_agent(self, agent_id: str, capabilities: Dict[str, Any]) -> bool:
        """Register an agent in the cluster."""
        db = self.server['agents']
        doc = {
            '_id': agent_id,
            'status': 'active',
            'capabilities': capabilities,
            'last_heartbeat': datetime.utcnow().isoformat(),
            'current_job': None
        }
        try:
            db.save(doc)
            return True
        except couchdb.http.ResourceConflict:
            return False
    
    def update_heartbeat(self, agent_id: str) -> bool:
        """Update agent heartbeat."""
        db = self.server['agents']
        try:
            doc = db[agent_id]
            doc['last_heartbeat'] = datetime.utcnow().isoformat()
            db.save(doc)
            return True
        except couchdb.http.ResourceNotFound:
            return False
    
    def create_job(self, job_type: str, params: Dict[str, Any]) -> str:
        """Create a new job in the job queue."""
        db = self.server['jobs']
        job_id = str(uuid.uuid4())
        doc = {
            '_id': job_id,
            'type': job_type,
            'params': params,
            'status': 'pending',
            'created_at': datetime.utcnow().isoformat(),
            'assigned_to': None
        }
        db.save(doc)
        return job_id
    
    def claim_job(self, agent_id: str) -> Optional[Dict[str, Any]]:
        """Attempt to claim a pending job."""
        db = self.server['jobs']
        for row in db.view('_all_docs', include_docs=True):
            doc = row.doc
            if doc.get('status') == 'pending':
                try:
                    doc['status'] = 'in_progress'
                    doc['assigned_to'] = agent_id
                    doc['claimed_at'] = datetime.utcnow().isoformat()
                    db.save(doc)
                    return doc
                except couchdb.http.ResourceConflict:
                    continue
        return None
    
    def update_job_status(self, job_id: str, status: str, result: Optional[Dict[str, Any]] = None) -> bool:
        """Update job status and optionally store results."""
        db = self.server['jobs']
        try:
            doc = db[job_id]
            doc['status'] = status
            if result:
                doc['result'] = result
            doc['updated_at'] = datetime.utcnow().isoformat()
            db.save(doc)
            return True
        except couchdb.http.ResourceNotFound:
            return False
    
    def store_gradients(self, job_id: str, gradients: Dict[str, Any]) -> str:
        """Store computed gradients."""
        db = self.server['gradients']
        gradient_id = str(uuid.uuid4())
        doc = {
            '_id': gradient_id,
            'job_id': job_id,
            'gradients': gradients,
            'timestamp': datetime.utcnow().isoformat()
        }
        db.save(doc)
        return gradient_id
    
    def get_active_agents(self) -> List[Dict[str, Any]]:
        """Get list of currently active agents."""
        db = self.server['agents']
        active_agents = []
        for row in db.view('_all_docs', include_docs=True):
            doc = row.doc
            if doc.get('status') == 'active':
                active_agents.append(doc)
        return active_agents
    
    def store_model_state(self, state: Dict[str, Any]) -> str:
        """Store current model state."""
        db = self.server['model_state']
        state_id = str(uuid.uuid4())
        doc = {
            '_id': state_id,
            'state': state,
            'timestamp': datetime.utcnow().isoformat()
        }
        db.save(doc)
        return state_id
    
    def get_latest_model_state(self) -> Optional[Dict[str, Any]]:
        """Retrieve the latest model state."""
        db = self.server['model_state']
        # Get the most recent state by timestamp
        for row in db.view('_all_docs', include_docs=True, descending=True, limit=1):
            return row.doc
        return None