"""限流控制""" import time import logging from collections import deque from typing import Dict from config import MESSAGE_RATE_LIMIT logger = logging.getLogger(__name__) class RateLimiter: """令牌桶限流器""" def __init__(self, max_requests: int = MESSAGE_RATE_LIMIT, window: int = 60): """初始化限流器 Args: max_requests: 时间窗口内最大请求数 window: 时间窗口(秒) """ self.max_requests = max_requests self.window = window # 使用deque存储时间戳 self.requests: deque = deque() logger.info(f"限流器已启用: {max_requests}条/{window}秒") def is_allowed(self) -> bool: """检查是否允许请求 Returns: 是否允许 """ current_time = time.time() # 清理过期的请求记录 while self.requests and self.requests[0] < current_time - self.window: self.requests.popleft() # 检查是否超过限制 if len(self.requests) < self.max_requests: self.requests.append(current_time) return True else: logger.warning(f"触发限流: 已达到 {self.max_requests}条/{self.window}秒") return False def get_remaining(self) -> int: """获取剩余可用次数""" current_time = time.time() # 清理过期的请求记录 while self.requests and self.requests[0] < current_time - self.window: self.requests.popleft() return max(0, self.max_requests - len(self.requests)) def get_reset_time(self) -> float: """获取重置时间(秒)""" if not self.requests: return 0 oldest_request = self.requests[0] reset_time = oldest_request + self.window - time.time() return max(0, reset_time) # 全局限流器实例(单例) _rate_limiter: RateLimiter = RateLimiter() def get_rate_limiter() -> RateLimiter: """获取全局限流器实例""" return _rate_limiter