API Rate Limiting Implementation


Why Rate Limiting Matters

Rate limiting protects APIs from abuse, DoS attacks, and unintentional overload. It ensures fair usage and maintains service quality for all consumers.

Rate Limiting Algorithms

Token Bucket

The most popular algorithm, allowing bursts while maintaining average limits:




import time


import threading




class TokenBucket:


def __init__(self, rate, capacity):


self.rate = rate # Tokens per second


self.capacity = capacity


self.tokens = capacity


self.last_refill = time.monotonic()


self.lock = threading.Lock()




def consume(self, tokens=1):


with self.lock:


self._refill()


if self.tokens >= tokens:


self.tokens -= tokens


return True


return False




def _refill(self):


now = time.monotonic()


elapsed = now - self.last_refill


self.tokens = min(self.capacity,


self.tokens + elapsed * self.rate)


self.last_refill = now




# Usage


bucket = TokenBucket(rate=10, capacity=20) # 10 req/s, burst 20


if bucket.consume():


process_request()


else:


return "429 Too Many Requests"





Sliding Window Log

More precise but memory-intensive:




from collections import deque


import time




class SlidingWindowLog:


def __init__(self, window_size=60, max_requests=100):


self.window_size = window_size


self.max_requests = max_requests


self.log = deque()




def allow_request(self):


now = time.time()


# Remove expired entries


while self.log and self.log[0] <= now - self.window_size:


self.log.popleft()




if len(self.log) < self.max_requests:


self.log.append(now)


return True


return False





Rate Limiting Headers

Return standard headers for client feedback:




def rate_limit_response(allowed, limit, remaining, reset):


if allowed:


return {


"X-RateLimit-Limit": str(limit),


"X-RateLimit-Remaining": str(remaining),


"X-RateLimit-Reset": str(reset)


}


else:


return {


"X-RateLimit-Limit": str(limit),


"X-RateLimit-Remaining": "0",


"Retry-After": str(reset - int(time.time()))


}, 429





Distributed Redis Implementation

For multi-server deployments:




import redis


import time




class RedisSlidingWindow:


def __init__(self, redis_client):


self.redis = redis_client




def is_allowed(self, key, max_requests=100, window_seconds=60):


now = int(time.time() * 1000)


window_start = now - (window_seconds * 1000)




pipeline = self.redis.pipeline()


pipeline.zremrangebyscore(key, 0, window_start)


pipeline.zcard(key)


pipeline.zadd(key, {str(now): now})


pipeline.expire(key, window_seconds * 2)




_, count, _, _ = pipeline.execute()


return count < max_requests





Middleware Implementation




const rateLimit = require("express-rate-limit");




const apiLimiter = rateLimit({


windowMs: 15 * 60 * 1000,


max: 100,


standardHeaders: true,


legacyHeaders: false,


message: { error: "Too many requests, please try again later." },


keyGenerator: (req) => req.user?.id || req.ip,


skip: (req) => req.headers["x-internal"] === process.env.INTERNAL_TOKEN


});




app.use("/api/", apiLimiter);





Conclusion

Choose the right rate limiting algorithm for your use case. Token bucket works well for most APIs. Use Redis for distributed rate limiting across multiple servers. Always return clear rate limit headers so clients can self-regulate. Monitor rate limit hit rates to tune thresholds over time.