Rate Limiting
限流¶
限流中间件用于控制 API 请求频率,防止滥用和保护服务器资源。
概述¶
限流(Rate Limiting)的主要目的:
- 防止滥用:限制恶意用户的请求频率
- 保护资源:防止服务器过载
- 公平使用:确保所有用户都能公平访问
- 防止 DDoS:缓解分布式拒绝服务攻击
快速开始¶
1. 创建存储¶
2. 配置限流规则¶
from srf.middleware.throttlemiddleware import IPRateLimit, UserRateLimit
app.config.RequestLimiter = [
IPRateLimit(100, 60, storage), # IP: 100次/分钟
UserRateLimit(1000, 60, storage), # 用户: 1000次/分钟
]
3. 注册中间件¶
from srf.middleware.throttlemiddleware import throttle_rate
from sanic.response import json
@app.middleware("request")
async def throttle_middleware(request):
"""限流中间件"""
if not await throttle_rate(request):
return json(
{"error": "请求过于频繁,请稍后再试"},
status=429
)
存储类¶
MemoryStorage¶
基于内存的存储,适用于单机部署。
特点:
- 简单易用
- 性能高
- 数据不持久化
- 不支持多实例共享
适用场景:
- 开发环境
- 单机部署
- 小规模应用
Redis 存储(自定义)¶
对于多实例部署,建议使用 Redis:
import aioredis
import time
class RedisStorage:
"""Redis 存储"""
def __init__(self, redis_pool):
self.redis = redis_pool
async def add(self, key):
"""添加请求记录"""
now = time.time()
await self.redis.zadd(key, now, now)
async def count(self, key, window):
"""统计时间窗口内的请求数"""
now = time.time()
cutoff = now - window
# 删除过期记录
await self.redis.zremrangebyscore(key, '-inf', cutoff)
# 统计数量
return await self.redis.zcard(key)
async def cleanup(self, key, window):
"""清理过期数据"""
now = time.time()
cutoff = now - window
await self.redis.zremrangebyscore(key, '-inf', cutoff)
# 使用
redis_pool = await aioredis.create_redis_pool('redis://localhost:6379')
storage = RedisStorage(redis_pool)
限流策略¶
IPRateLimit - IP 限流¶
基于客户端 IP 地址限流。
from srf.middleware.throttlemiddleware import IPRateLimit
# 100次/60秒
limiter = IPRateLimit(
limit=100, # 最大请求数
window=60, # 时间窗口(秒)
storage=storage # 存储实例
)
适用场景:
- 防止单个 IP 滥用
- 防止简单的 DDoS 攻击
- 限制匿名用户
注意事项:
- NAT 网络下多个用户共享 IP
- 使用代理或 VPN 可以绕过
UserRateLimit - 用户限流¶
基于已认证用户限流。
from srf.middleware.throttlemiddleware import UserRateLimit
# 1000次/60秒
limiter = UserRateLimit(
limit=1000,
window=60,
storage=storage
)
适用场景:
- 限制认证用户的请求频率
- 不同用户级别使用不同限制
- 防止账号滥用
特点:
- 更精确的控制
- 可以区分用户级别
- 需要用户已登录
PathRateLimit - 路径限流¶
基于请求路径限流。
from srf.middleware.throttlemiddleware import PathRateLimit
# 特定路径限流
limiter = PathRateLimit(
limit=10,
window=60,
storage=storage
)
适用场景:
- 保护特定的高成本端点
- 限制敏感操作(如密码重置)
- 不同端点使用不同限制
HeaderRateLimit - 请求头限流¶
基于自定义请求头限流。
from srf.middleware.throttlemiddleware import HeaderRateLimit
# 基于 API Key 限流
limiter = HeaderRateLimit(
limit=500,
window=60,
storage=storage,
header_name='X-API-Key'
)
适用场景:
- API Key 限流
- 第三方集成限流
- 租户隔离
组合使用¶
可以同时使用多个限流策略:
from srf.middleware.throttlemiddleware import (
MemoryStorage,
IPRateLimit,
UserRateLimit,
PathRateLimit,
)
storage = MemoryStorage()
app.config.RequestLimiter = [
# IP 限流:防止恶意攻击
IPRateLimit(100, 60, storage),
# 用户限流:针对认证用户
UserRateLimit(1000, 60, storage),
# 路径限流:保护特定端点
PathRateLimit(10, 60, storage),
]
@app.middleware("request")
async def throttle_middleware(request):
if not await throttle_rate(request):
from sanic.response import json
return json({"error": "请求过于频繁"}, status=429)
检查顺序:
- 所有限流器按顺序检查
- 任一限流器拒绝请求,立即返回 429
- 所有限流器通过,继续处理请求
自定义限流器¶
创建自定义限流器¶
from srf.middleware.throttlemiddleware import BaseRateLimit
import time
class ApiKeyRateLimit(BaseRateLimit):
"""基于 API Key 的限流"""
def __init__(self, limit, window, storage, key_limits=None):
super().__init__(limit, window, storage)
self.key_limits = key_limits or {}
def get_key(self, request):
"""生成限流键"""
api_key = request.headers.get('X-API-Key')
if not api_key:
return None
return f"api_key:{api_key}"
def get_limit(self, request):
"""获取限制数(可以为不同 API Key 设置不同限制)"""
api_key = request.headers.get('X-API-Key')
return self.key_limits.get(api_key, self.limit)
async def allow(self, request):
"""检查是否允许请求"""
key = self.get_key(request)
if not key:
return True
# 获取当前计数
count = await self.storage.count(key, self.window)
limit = self.get_limit(request)
if count >= limit:
return False
# 记录请求
await self.storage.add(key)
return True
# 使用
limiter = ApiKeyRateLimit(
limit=100,
window=60,
storage=storage,
key_limits={
'premium_key_1': 1000, # 高级用户
'basic_key_1': 100, # 基础用户
}
)
动态限流¶
根据服务器负载动态调整限制:
import psutil
class DynamicRateLimit(BaseRateLimit):
"""动态限流"""
def get_limit(self, request):
"""根据服务器负载动态调整限制"""
cpu_percent = psutil.cpu_percent()
if cpu_percent > 80:
# 高负载:减少限制
return self.limit // 2
elif cpu_percent > 60:
# 中等负载:正常限制
return self.limit
else:
# 低负载:放宽限制
return self.limit * 2
限流响应¶
添加限流信息头¶
@app.middleware("response")
async def add_rate_limit_headers(request, response):
"""添加限流信息到响应头"""
if hasattr(request.ctx, 'rate_limit_info'):
info = request.ctx.rate_limit_info
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset'])
修改限流器以提供信息:
class EnhancedRateLimit(BaseRateLimit):
async def allow(self, request):
key = self.get_key(request)
count = await self.storage.count(key, self.window)
# 保存限流信息到请求上下文
import time
request.ctx.rate_limit_info = {
'limit': self.limit,
'remaining': max(0, self.limit - count),
'reset': int(time.time() + self.window)
}
if count >= self.limit:
return False
await self.storage.add(key)
return True
自定义错误响应¶
@app.middleware("request")
async def throttle_middleware(request):
if not await throttle_rate(request):
# 获取限流信息
info = getattr(request.ctx, 'rate_limit_info', {})
from sanic.response import json
return json({
"error": "请求过于频繁",
"message": f"您已超过限制({info.get('limit', 'N/A')} 次/分钟)",
"retry_after": info.get('reset', 60)
}, status=429, headers={
'Retry-After': str(info.get('reset', 60))
})
完整示例¶
from sanic import Sanic
from srf.middleware.throttlemiddleware import (
MemoryStorage,
IPRateLimit,
UserRateLimit,
PathRateLimit,
throttle_rate
)
from sanic.response import json
app = Sanic("MyApp")
# 创建存储
storage = MemoryStorage()
# 配置限流规则
app.config.RequestLimiter = [
# IP 限流:100 次/分钟
IPRateLimit(100, 60, storage),
# 用户限流:1000 次/分钟
UserRateLimit(1000, 60, storage),
# 路径限流:特定路径 10 次/分钟
PathRateLimit(10, 60, storage),
]
# 限流中间件
@app.middleware("request")
async def throttle_middleware(request):
"""限流中间件"""
# 跳过健康检查端点
if request.path == '/health/':
return
if not await throttle_rate(request):
return json({
"error": "请求过于频繁",
"message": "请稍后再试"
}, status=429, headers={
'Retry-After': '60'
})
# 添加限流信息头
@app.middleware("response")
async def add_rate_limit_headers(request, response):
"""添加限流信息"""
if hasattr(request.ctx, 'rate_limit_info'):
info = request.ctx.rate_limit_info
response.headers['X-RateLimit-Limit'] = str(info.get('limit', ''))
response.headers['X-RateLimit-Remaining'] = str(info.get('remaining', ''))
response.headers['X-RateLimit-Reset'] = str(info.get('reset', ''))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
最佳实践¶
- 分级限流:为不同用户级别设置不同限制
- 合理的限制:不要太严格影响正常使用
- 清晰的错误信息:告诉用户何时可以重试
- 监控和告警:监控限流触发情况
- 白名单:为特定 IP 或用户设置白名单
- 日志记录:记录被限流的请求
- 使用 Redis:生产环境使用 Redis 存储
性能优化¶
1. 使用 Redis¶
import aioredis
@app.before_server_start
async def setup_redis(app, loop):
app.ctx.redis = await aioredis.create_redis_pool(
'redis://localhost:6379',
minsize=5,
maxsize=10
)
# 使用 Redis 存储
storage = RedisStorage(app.ctx.redis)
2. 定期清理¶
from sanic import Sanic
import asyncio
async def cleanup_task(app):
"""定期清理过期数据"""
while True:
await asyncio.sleep(300) # 5分钟
# 清理逻辑
# ...
@app.before_server_start
async def start_cleanup(app, loop):
app.add_task(cleanup_task(app))
3. 批量操作¶
# 批量获取多个键的计数
async def get_counts_batch(keys, window):
pipeline = redis.pipeline()
for key in keys:
pipeline.zcount(key, time.time() - window, '+inf')
return await pipeline.execute()
监控和告警¶
记录限流事件¶
import logging
logger = logging.getLogger(__name__)
@app.middleware("request")
async def throttle_middleware(request):
if not await throttle_rate(request):
# 记录限流事件
logger.warning(
f"Rate limit exceeded: "
f"IP={request.ip}, "
f"Path={request.path}, "
f"User={getattr(request.ctx, 'user', None)}"
)
# 发送告警
# await send_alert(...)
return json({"error": "请求过于频繁"}, status=429)
统计指标¶
from prometheus_client import Counter
rate_limit_exceeded = Counter(
'rate_limit_exceeded_total',
'Total rate limit exceeded',
['path', 'method']
)
@app.middleware("request")
async def throttle_middleware(request):
if not await throttle_rate(request):
rate_limit_exceeded.labels(
path=request.path,
method=request.method
).inc()
return json({"error": "请求过于频繁"}, status=429)