navkast commited on
Commit
6abd06b
·
unverified ·
1 Parent(s): dce8143

Add per-minute rate limiter for linkedin (#6)

Browse files
src/vsp/app/scrapers/linkedin_downloader.py CHANGED
@@ -7,6 +7,7 @@ of requests in case of failures.
7
 
8
  Classes:
9
  LinkedInFetchFailedError: Custom exception for LinkedIn fetch failures.
 
10
  LinkedinDownloader: Main class for downloading LinkedIn profile data.
11
 
12
  Usage:
@@ -15,6 +16,8 @@ Usage:
15
  """
16
 
17
  import asyncio
 
 
18
  from typing import Final
19
 
20
  import aiohttp
@@ -30,6 +33,47 @@ class LinkedInFetchFailedError(Exception):
30
  """Custom exception raised when fetching LinkedIn profile data fails."""
31
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  class LinkedinDownloader:
34
  """
35
  A class for asynchronously downloading LinkedIn profile data.
@@ -42,20 +86,23 @@ class LinkedinDownloader:
42
  _X_RAPIDAPI_HOST (Final[str]): The RapidAPI host for LinkedIn API.
43
  _api_key (str): The RapidAPI key for authentication.
44
  _semaphore (asyncio.Semaphore): Semaphore for limiting concurrent requests.
 
45
  """
46
 
47
  _URL: Final[str] = "https://linkedin-api8.p.rapidapi.com/"
48
  _X_RAPIDAPI_HOST: Final[str] = "linkedin-api8.p.rapidapi.com"
49
 
50
- def __init__(self, max_concurrency: int = 10):
51
  """
52
  Initialize the LinkedinDownloader.
53
 
54
  Args:
55
- max_concurrency (int): Maximum number of concurrent API calls. Defaults to 10.
 
56
  """
57
  self._api_key = self._fetch_api_key()
58
  self._semaphore = asyncio.Semaphore(max_concurrency)
 
59
 
60
  @staticmethod
61
  def _fetch_api_key() -> str:
@@ -116,6 +163,7 @@ class LinkedinDownloader:
116
  LinkedInFetchFailedError: If the API call fails after all retry attempts.
117
  """
118
  async with self._semaphore:
 
119
  headers, querystring = self._compose_request(linkedin_url)
120
  logger.info("Fetching LinkedIn profile", url=linkedin_url)
121
  async with aiohttp.ClientSession() as session:
 
7
 
8
  Classes:
9
  LinkedInFetchFailedError: Custom exception for LinkedIn fetch failures.
10
+ RateLimiter: Token bucket algorithm implementation for rate limiting.
11
  LinkedinDownloader: Main class for downloading LinkedIn profile data.
12
 
13
  Usage:
 
16
  """
17
 
18
  import asyncio
19
+ import math
20
+ import time
21
  from typing import Final
22
 
23
  import aiohttp
 
33
  """Custom exception raised when fetching LinkedIn profile data fails."""
34
 
35
 
36
+ class RateLimiter:
37
+ """
38
+ Implements a token bucket algorithm for rate limiting.
39
+
40
+ This class manages a token bucket to control the rate of API requests,
41
+ ensuring that the number of requests per minute does not exceed a specified limit.
42
+ """
43
+
44
+ def __init__(self, rate: int, per: float = 60.0):
45
+ """
46
+ Initialize the RateLimiter.
47
+
48
+ Args:
49
+ rate (int): The number of tokens (requests) allowed per time period.
50
+ per (float): The time period in seconds. Defaults to 60.0 (1 minute).
51
+ """
52
+ self.rate = rate
53
+ self.per = per
54
+ self.allowance = rate
55
+ self.last_check = time.monotonic()
56
+
57
+ async def acquire(self) -> None:
58
+ """
59
+ Acquire a token from the bucket, waiting if necessary.
60
+
61
+ This method implements the token bucket algorithm. If there are no tokens
62
+ available, it will sleep until a token becomes available.
63
+ """
64
+ current = time.monotonic()
65
+ time_passed = current - self.last_check
66
+ self.last_check = current
67
+ self.allowance += math.floor(time_passed * (self.rate / self.per))
68
+ if self.allowance > self.rate:
69
+ self.allowance = self.rate
70
+ if self.allowance < 1:
71
+ await asyncio.sleep((1 - self.allowance) * self.per / self.rate)
72
+ self.allowance = 0
73
+ else:
74
+ self.allowance -= 1
75
+
76
+
77
  class LinkedinDownloader:
78
  """
79
  A class for asynchronously downloading LinkedIn profile data.
 
86
  _X_RAPIDAPI_HOST (Final[str]): The RapidAPI host for LinkedIn API.
87
  _api_key (str): The RapidAPI key for authentication.
88
  _semaphore (asyncio.Semaphore): Semaphore for limiting concurrent requests.
89
+ _rate_limiter (RateLimiter): Rate limiter for controlling requests per minute.
90
  """
91
 
92
  _URL: Final[str] = "https://linkedin-api8.p.rapidapi.com/"
93
  _X_RAPIDAPI_HOST: Final[str] = "linkedin-api8.p.rapidapi.com"
94
 
95
+ def __init__(self, max_concurrency: int = 2, max_per_minute: int = 10):
96
  """
97
  Initialize the LinkedinDownloader.
98
 
99
  Args:
100
+ max_concurrency (int): Maximum number of concurrent API calls. Defaults to 2.
101
+ max_per_minute (int): Maximum number of requests per minute. Defaults to 5.
102
  """
103
  self._api_key = self._fetch_api_key()
104
  self._semaphore = asyncio.Semaphore(max_concurrency)
105
+ self._rate_limiter = RateLimiter(max_per_minute)
106
 
107
  @staticmethod
108
  def _fetch_api_key() -> str:
 
163
  LinkedInFetchFailedError: If the API call fails after all retry attempts.
164
  """
165
  async with self._semaphore:
166
+ await self._rate_limiter.acquire() # Acquire a token from the rate limiter
167
  headers, querystring = self._compose_request(linkedin_url)
168
  logger.info("Fetching LinkedIn profile", url=linkedin_url)
169
  async with aiohttp.ClientSession() as session: