lamhieu commited on
Commit
3728af4
·
1 Parent(s): 6bcf9a9

chore: update something

Browse files
Files changed (1) hide show
  1. lightweight_embeddings/analytics.py +115 -54
lightweight_embeddings/analytics.py CHANGED
@@ -159,6 +159,8 @@ class Analytics:
159
  """
160
  loop = asyncio.get_running_loop()
161
  async with self.lock:
 
 
162
  # Reset local structures
163
  self.current_totals = {
164
  "access": defaultdict(lambda: defaultdict(int)),
@@ -169,67 +171,126 @@ class Analytics:
169
  "tokens": defaultdict(lambda: defaultdict(int)),
170
  }
171
 
 
 
172
  cursor = 0
173
- while True:
174
- scan_result = await loop.run_in_executor(
175
- None,
176
- partial(
177
- self.redis_client.scan,
178
- cursor=cursor,
179
- match="analytics:access:*",
180
- count=1000,
181
- ),
182
- )
183
- cursor, keys = scan_result[0], scan_result[1]
184
-
185
- for key in keys:
186
- # key is "analytics:access:<period>"
187
- period = key.replace("analytics:access:", "")
188
- data = await loop.run_in_executor(
189
- None, partial(self.redis_client.hgetall, key)
190
  )
191
- # Ensure data is not None and handle empty results
192
- if data:
193
- for model_id, count_str in data.items():
194
- try:
195
- self.current_totals["access"][period][model_id] = int(count_str)
196
- except (ValueError, TypeError):
197
- logger.warning("Invalid count value for model %s in period %s: %s", model_id, period, count_str)
198
- self.current_totals["access"][period][model_id] = 0
199
-
200
- if cursor == 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  break
 
 
 
202
 
 
 
203
  cursor = 0
204
- while True:
205
- scan_result = await loop.run_in_executor(
206
- None,
207
- partial(
208
- self.redis_client.scan,
209
- cursor=cursor,
210
- match="analytics:tokens:*",
211
- count=1000,
212
- ),
213
- )
214
- cursor, keys = scan_result[0], scan_result[1]
215
-
216
- for key in keys:
217
- # key is "analytics:tokens:<period>"
218
- period = key.replace("analytics:tokens:", "")
219
- data = await loop.run_in_executor(
220
- None, partial(self.redis_client.hgetall, key)
221
  )
222
- # Ensure data is not None and handle empty results
223
- if data:
224
- for model_id, count_str in data.items():
225
- try:
226
- self.current_totals["tokens"][period][model_id] = int(count_str)
227
- except (ValueError, TypeError):
228
- logger.warning("Invalid token count value for model %s in period %s: %s", model_id, period, count_str)
229
- self.current_totals["tokens"][period][model_id] = 0
230
-
231
- if cursor == 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  break
 
 
 
 
 
 
233
 
234
  async def _sync_to_redis(self):
235
  """
 
159
  """
160
  loop = asyncio.get_running_loop()
161
  async with self.lock:
162
+ logger.info("Starting sync from Upstash Redis...")
163
+
164
  # Reset local structures
165
  self.current_totals = {
166
  "access": defaultdict(lambda: defaultdict(int)),
 
171
  "tokens": defaultdict(lambda: defaultdict(int)),
172
  }
173
 
174
+ # Sync access data
175
+ logger.debug("Scanning access analytics keys...")
176
  cursor = 0
177
+ scan_attempts = 0
178
+ max_scan_attempts = 100 # Prevent infinite loops
179
+
180
+ while scan_attempts < max_scan_attempts:
181
+ try:
182
+ scan_result = await loop.run_in_executor(
183
+ None,
184
+ partial(
185
+ self.redis_client.scan,
186
+ cursor=cursor,
187
+ match="analytics:access:*",
188
+ count=10000,
189
+ ),
 
 
 
 
190
  )
191
+
192
+ # Handle different return formats from Upstash Redis
193
+ if isinstance(scan_result, (list, tuple)) and len(scan_result) >= 2:
194
+ cursor, keys = scan_result[0], scan_result[1]
195
+ else:
196
+ logger.warning("Unexpected scan result format: %s", scan_result)
197
+ break
198
+
199
+ # Convert cursor to int if it's a string
200
+ if isinstance(cursor, str):
201
+ cursor = int(cursor) if cursor.isdigit() else 0
202
+
203
+ logger.debug("Scanned access keys: cursor=%s, found %d keys", cursor, len(keys) if keys else 0)
204
+
205
+ if keys:
206
+ for key in keys:
207
+ # key is "analytics:access:<period>"
208
+ period = key.replace("analytics:access:", "")
209
+ data = await loop.run_in_executor(
210
+ None, partial(self.redis_client.hgetall, key)
211
+ )
212
+ # Ensure data is not None and handle empty results
213
+ if data:
214
+ for model_id, count_str in data.items():
215
+ try:
216
+ self.current_totals["access"][period][model_id] = int(count_str)
217
+ except (ValueError, TypeError):
218
+ logger.warning("Invalid count value for model %s in period %s: %s", model_id, period, count_str)
219
+ self.current_totals["access"][period][model_id] = 0
220
+
221
+ if cursor == 0:
222
+ break
223
+
224
+ scan_attempts += 1
225
+
226
+ except Exception as e:
227
+ logger.error("Error during access scan attempt %d: %s", scan_attempts, e)
228
  break
229
+
230
+ if scan_attempts >= max_scan_attempts:
231
+ logger.warning("Max scan attempts reached for access keys. Stopping scan.")
232
 
233
+ # Sync token data
234
+ logger.debug("Scanning token analytics keys...")
235
  cursor = 0
236
+ scan_attempts = 0
237
+ max_scan_attempts = 100 # Prevent infinite loops
238
+
239
+ while scan_attempts < max_scan_attempts:
240
+ try:
241
+ scan_result = await loop.run_in_executor(
242
+ None,
243
+ partial(
244
+ self.redis_client.scan,
245
+ cursor=cursor,
246
+ match="analytics:tokens:*",
247
+ count=1000,
248
+ ),
 
 
 
 
249
  )
250
+
251
+ # Handle different return formats from Upstash Redis
252
+ if isinstance(scan_result, (list, tuple)) and len(scan_result) >= 2:
253
+ cursor, keys = scan_result[0], scan_result[1]
254
+ else:
255
+ logger.warning("Unexpected scan result format: %s", scan_result)
256
+ break
257
+
258
+ # Convert cursor to int if it's a string
259
+ if isinstance(cursor, str):
260
+ cursor = int(cursor) if cursor.isdigit() else 0
261
+
262
+ logger.debug("Scanned token keys: cursor=%s, found %d keys", cursor, len(keys) if keys else 0)
263
+
264
+ if keys:
265
+ for key in keys:
266
+ # key is "analytics:tokens:<period>"
267
+ period = key.replace("analytics:tokens:", "")
268
+ data = await loop.run_in_executor(
269
+ None, partial(self.redis_client.hgetall, key)
270
+ )
271
+ # Ensure data is not None and handle empty results
272
+ if data:
273
+ for model_id, count_str in data.items():
274
+ try:
275
+ self.current_totals["tokens"][period][model_id] = int(count_str)
276
+ except (ValueError, TypeError):
277
+ logger.warning("Invalid token count value for model %s in period %s: %s", model_id, period, count_str)
278
+ self.current_totals["tokens"][period][model_id] = 0
279
+
280
+ if cursor == 0:
281
+ break
282
+
283
+ scan_attempts += 1
284
+
285
+ except Exception as e:
286
+ logger.error("Error during token scan attempt %d: %s", scan_attempts, e)
287
  break
288
+
289
+ if scan_attempts >= max_scan_attempts:
290
+ logger.warning("Max scan attempts reached for token keys. Stopping scan.")
291
+
292
+ logger.info("Completed sync from Upstash Redis. Loaded %d access periods, %d token periods.",
293
+ len(self.current_totals["access"]), len(self.current_totals["tokens"]))
294
 
295
  async def _sync_to_redis(self):
296
  """