| import asyncio | |
| import math | |
| import time | |
| from vsp.shared import logger_factory | |
| logger = logger_factory.get_logger(__name__) | |
| class BedrockRateLimiter: | |
| """ | |
| A rate limiter for AWS Bedrock API calls. | |
| This class implements a token bucket algorithm to manage API request rates. | |
| """ | |
| def __init__(self, rate: int, per: float = 60.0): | |
| """ | |
| Initialize the BedrockRateLimiter. | |
| Args: | |
| rate (int): The number of requests allowed per time period. | |
| per (float): The time period in seconds. Defaults to 60.0. | |
| """ | |
| self.rate = rate | |
| self.per = per | |
| self.allowance = rate | |
| self.last_check = time.time() | |
| async def acquire(self) -> None: | |
| """ | |
| Acquire permission to make an API call, respecting rate limits. | |
| This method implements a token bucket algorithm. If the rate limit is exceeded, | |
| it will pause execution for an appropriate amount of time. | |
| Raises: | |
| No specific exceptions are raised, but the method may cause the execution | |
| to sleep for up to 30 seconds if the rate limit is exceeded. | |
| """ | |
| now = time.time() | |
| time_passed = now - self.last_check | |
| self.last_check = now | |
| self.allowance += math.ceil(time_passed * (self.rate / self.per)) | |
| if self.allowance > self.rate: | |
| self.allowance = self.rate | |
| if self.allowance < 1: | |
| logger.info("Rate limit exceeded", sleep_time=30) | |
| await asyncio.sleep(30) | |
| self.allowance = 0 | |
| else: | |
| self.allowance -= 1 | |