初始化
This commit is contained in:
2
utils/__init__.py
Normal file
2
utils/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
"""工具模块"""
|
||||
|
||||
132
utils/message.py
Normal file
132
utils/message.py
Normal file
@@ -0,0 +1,132 @@
|
||||
"""WPS消息构造和发送工具"""
|
||||
import httpx
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
from config import WEBHOOK_URL
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessageSender:
|
||||
"""消息发送器"""
|
||||
|
||||
def __init__(self, webhook_url: str = WEBHOOK_URL):
|
||||
"""初始化消息发送器
|
||||
|
||||
Args:
|
||||
webhook_url: Webhook URL
|
||||
"""
|
||||
self.webhook_url = webhook_url
|
||||
self.client: Optional[httpx.AsyncClient] = None
|
||||
|
||||
async def _get_client(self) -> httpx.AsyncClient:
|
||||
"""获取HTTP客户端(懒加载)"""
|
||||
if self.client is None:
|
||||
self.client = httpx.AsyncClient(timeout=10.0)
|
||||
return self.client
|
||||
|
||||
async def send_message(self, message: Dict[str, Any]) -> bool:
|
||||
"""发送消息到WPS
|
||||
|
||||
Args:
|
||||
message: 消息字典
|
||||
|
||||
Returns:
|
||||
是否发送成功
|
||||
"""
|
||||
try:
|
||||
client = await self._get_client()
|
||||
response = await client.post(self.webhook_url, json=message)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"消息发送成功: {message.get('msgtype')}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"消息发送失败: status={response.status_code}, body={response.text}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"发送消息异常: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
|
||||
"""发送文本消息
|
||||
|
||||
Args:
|
||||
content: 文本内容
|
||||
at_user_id: @用户ID(可选)
|
||||
|
||||
Returns:
|
||||
是否发送成功
|
||||
"""
|
||||
# 如果需要@人
|
||||
if at_user_id:
|
||||
content = f'<at user_id="{at_user_id}"></at> {content}'
|
||||
|
||||
message = {
|
||||
"msgtype": "text",
|
||||
"text": {
|
||||
"content": content
|
||||
}
|
||||
}
|
||||
return await self.send_message(message)
|
||||
|
||||
async def send_markdown(self, text: str) -> bool:
|
||||
"""发送Markdown消息
|
||||
|
||||
Args:
|
||||
text: Markdown文本
|
||||
|
||||
Returns:
|
||||
是否发送成功
|
||||
"""
|
||||
message = {
|
||||
"msgtype": "markdown",
|
||||
"markdown": {
|
||||
"text": text
|
||||
}
|
||||
}
|
||||
return await self.send_message(message)
|
||||
|
||||
async def send_link(self, title: str, text: str,
|
||||
message_url: str = "", btn_title: str = "查看详情") -> bool:
|
||||
"""发送链接消息
|
||||
|
||||
Args:
|
||||
title: 标题
|
||||
text: 文本内容
|
||||
message_url: 跳转URL
|
||||
btn_title: 按钮文字
|
||||
|
||||
Returns:
|
||||
是否发送成功
|
||||
"""
|
||||
message = {
|
||||
"msgtype": "link",
|
||||
"link": {
|
||||
"title": title,
|
||||
"text": text,
|
||||
"messageUrl": message_url,
|
||||
"btnTitle": btn_title
|
||||
}
|
||||
}
|
||||
return await self.send_message(message)
|
||||
|
||||
async def close(self):
|
||||
"""关闭HTTP客户端"""
|
||||
if self.client:
|
||||
await self.client.aclose()
|
||||
self.client = None
|
||||
|
||||
|
||||
# 全局消息发送器实例
|
||||
_sender_instance: Optional[MessageSender] = None
|
||||
|
||||
|
||||
def get_message_sender() -> MessageSender:
|
||||
"""获取全局消息发送器实例(单例模式)"""
|
||||
global _sender_instance
|
||||
if _sender_instance is None:
|
||||
_sender_instance = MessageSender()
|
||||
return _sender_instance
|
||||
|
||||
92
utils/parser.py
Normal file
92
utils/parser.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""指令解析器"""
|
||||
import re
|
||||
import logging
|
||||
from typing import Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CommandParser:
|
||||
"""指令解析器"""
|
||||
|
||||
# 指令映射表
|
||||
COMMAND_MAP = {
|
||||
# 骰娘
|
||||
'.r': 'dice',
|
||||
'.roll': 'dice',
|
||||
|
||||
# 石头剪刀布
|
||||
'.rps': 'rps',
|
||||
|
||||
# 运势占卜
|
||||
'.fortune': 'fortune',
|
||||
'.运势': 'fortune',
|
||||
|
||||
# 猜数字
|
||||
'.guess': 'guess',
|
||||
'.猜数字': 'guess',
|
||||
|
||||
# 问答
|
||||
'.quiz': 'quiz',
|
||||
'.问答': 'quiz',
|
||||
|
||||
# 帮助
|
||||
'.help': 'help',
|
||||
'.帮助': 'help',
|
||||
|
||||
# 统计
|
||||
'.stats': 'stats',
|
||||
'.统计': 'stats',
|
||||
}
|
||||
|
||||
# 机器人名称模式(用于从@消息中提取)
|
||||
AT_PATTERN = re.compile(r'@\s*\S+\s+(.+)', re.DOTALL)
|
||||
|
||||
@classmethod
|
||||
def parse(cls, content: str) -> Optional[Tuple[str, str]]:
|
||||
"""解析消息内容,提取游戏类型和指令
|
||||
|
||||
Args:
|
||||
content: 消息内容
|
||||
|
||||
Returns:
|
||||
(游戏类型, 完整指令) 或 None
|
||||
"""
|
||||
# 去除首尾空格
|
||||
content = content.strip()
|
||||
|
||||
# 尝试提取@后的内容
|
||||
at_match = cls.AT_PATTERN.search(content)
|
||||
if at_match:
|
||||
content = at_match.group(1).strip()
|
||||
|
||||
# 检查是否以指令开头
|
||||
for cmd_prefix, game_type in cls.COMMAND_MAP.items():
|
||||
if content.startswith(cmd_prefix):
|
||||
# 返回游戏类型和完整指令
|
||||
return game_type, content
|
||||
|
||||
# 没有匹配的指令
|
||||
logger.debug(f"未识别的指令: {content}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def extract_command_args(cls, command: str) -> Tuple[str, str]:
|
||||
"""提取指令和参数
|
||||
|
||||
Args:
|
||||
command: 完整指令,如 ".r 1d20" 或 ".guess 50"
|
||||
|
||||
Returns:
|
||||
(指令前缀, 参数部分)
|
||||
"""
|
||||
parts = command.split(maxsplit=1)
|
||||
cmd = parts[0] if parts else ""
|
||||
args = parts[1] if len(parts) > 1 else ""
|
||||
return cmd, args
|
||||
|
||||
@classmethod
|
||||
def is_help_command(cls, command: str) -> bool:
|
||||
"""判断是否为帮助指令"""
|
||||
return command.strip() in ['.help', '.帮助', 'help', '帮助']
|
||||
|
||||
74
utils/rate_limit.py
Normal file
74
utils/rate_limit.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""限流控制"""
|
||||
import time
|
||||
import logging
|
||||
from collections import deque
|
||||
from typing import Dict
|
||||
from config import MESSAGE_RATE_LIMIT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""令牌桶限流器"""
|
||||
|
||||
def __init__(self, max_requests: int = MESSAGE_RATE_LIMIT, window: int = 60):
|
||||
"""初始化限流器
|
||||
|
||||
Args:
|
||||
max_requests: 时间窗口内最大请求数
|
||||
window: 时间窗口(秒)
|
||||
"""
|
||||
self.max_requests = max_requests
|
||||
self.window = window
|
||||
# 使用deque存储时间戳
|
||||
self.requests: deque = deque()
|
||||
logger.info(f"限流器已启用: {max_requests}条/{window}秒")
|
||||
|
||||
def is_allowed(self) -> bool:
|
||||
"""检查是否允许请求
|
||||
|
||||
Returns:
|
||||
是否允许
|
||||
"""
|
||||
current_time = time.time()
|
||||
|
||||
# 清理过期的请求记录
|
||||
while self.requests and self.requests[0] < current_time - self.window:
|
||||
self.requests.popleft()
|
||||
|
||||
# 检查是否超过限制
|
||||
if len(self.requests) < self.max_requests:
|
||||
self.requests.append(current_time)
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"触发限流: 已达到 {self.max_requests}条/{self.window}秒")
|
||||
return False
|
||||
|
||||
def get_remaining(self) -> int:
|
||||
"""获取剩余可用次数"""
|
||||
current_time = time.time()
|
||||
|
||||
# 清理过期的请求记录
|
||||
while self.requests and self.requests[0] < current_time - self.window:
|
||||
self.requests.popleft()
|
||||
|
||||
return max(0, self.max_requests - len(self.requests))
|
||||
|
||||
def get_reset_time(self) -> float:
|
||||
"""获取重置时间(秒)"""
|
||||
if not self.requests:
|
||||
return 0
|
||||
|
||||
oldest_request = self.requests[0]
|
||||
reset_time = oldest_request + self.window - time.time()
|
||||
return max(0, reset_time)
|
||||
|
||||
|
||||
# 全局限流器实例(单例)
|
||||
_rate_limiter: RateLimiter = RateLimiter()
|
||||
|
||||
|
||||
def get_rate_limiter() -> RateLimiter:
|
||||
"""获取全局限流器实例"""
|
||||
return _rate_limiter
|
||||
|
||||
Reference in New Issue
Block a user