Design a Rate Limiter in Python


Implement a rate limiter with the following methods:

  • RateLimiter(int expire) constructs a new rate limiter with the given expire time.
  • limit(int uid, int timestamp) returns whether the user should be rate limited. A given user should be rate limited if that user had previously called limit without being rate limited after timestamp – expiry.

You can assume that timestamp is monotonically increasing.
Constraints
n ≤ 100,000 where n is the number of calls to limit.
Example 1
Input
methods = [“constructor”, “limit”, “limit”, “limit”, “limit”]
arguments = [[5], [0, 10], [0, 15], [0, 16], [1, 17]]
Output
[None, False, False, True, False]
Explanation

1
2
3
4
5
rl = RateLimiter(5)
rl.limit(0, 10) == False
rl.limit(0, 15) == False
rl.limit(0, 16) == True
rl.limit(1, 17) == False
rl = RateLimiter(5)
rl.limit(0, 10) == False
rl.limit(0, 15) == False
rl.limit(0, 16) == True
rl.limit(1, 17) == False

Example 2
Input
methods = [“constructor”, “limit”, “limit”, “limit”, “limit”]
arguments = [[0], [0, 1], [0, 1], [0, 2], [0, 3]]`
Output
[None, False, False, False, False]
Explanation

1
2
3
4
5
rl = RateLimiter(0)
rl.limit(0, 1) == False
rl.limit(0, 1) == False
rl.limit(0, 2) == False
rl.limit(1, 3) == False
rl = RateLimiter(0)
rl.limit(0, 1) == False
rl.limit(0, 1) == False
rl.limit(0, 2) == False
rl.limit(1, 3) == False

Example 3
Input
methods = [“constructor”, “limit”, “limit”, “limit”]
arguments = [[5], [0, 10], [0, 13], [0, 16]]`
Output
[None, False, True, False]
Explanation

1
2
3
4
rl = RateLimiter(5)
rl.limit(0, 10) == False
rl.limit(0, 13) == True
rl.limit(0, 16) == False
rl = RateLimiter(5)
rl.limit(0, 10) == False
rl.limit(0, 13) == True
rl.limit(0, 16) == False

Rate Limiter implemented by Queue

For each request, we use a hash map (or dictionary) to store it. The key is the request ID, and the value is the double-ended queue, where we keep a window of limit.

1
2
3
4
5
6
7
8
9
10
11
12
class RateLimiter:
    def __init__(self, expire):
        self.w = expire
        self.d = defaultdict(deque)
 
    def limit(self, uid, timestamp):
        while len(self.d[uid]) > 0 and timestamp - self.d[uid][0] >= self.w:
            self.d[uid].popleft()
        ans = len(self.d[uid]) > 0
        if not ans:
            self.d[uid].append(timestamp)
        return ans
class RateLimiter:
    def __init__(self, expire):
        self.w = expire
        self.d = defaultdict(deque)

    def limit(self, uid, timestamp):
        while len(self.d[uid]) > 0 and timestamp - self.d[uid][0] >= self.w:
            self.d[uid].popleft()
        ans = len(self.d[uid]) > 0
        if not ans:
            self.d[uid].append(timestamp)
        return ans

For each limit function, we remove the outdated requests in the queue, then accept this request only if there are non requests in the queue, otherwise we rate limit it.

See also: Easy Rate Limit in PHP using Simple Strategy – An API Example

–EOF (The Ultimate Computing & Technology Blog) —

GD Star Rating
loading...
426 words
Last Post: Teaching Kids Programming - Introduction to Permutation and Combination
Next Post: The C++ template for Printing the Vector/List/Set/Map by Overriding the cout Operator?

The Permanent URL is: Design a Rate Limiter in Python

Leave a Reply