"""WPS消息构造和发送工具""" import httpx import logging from typing import Dict, Any, Optional from config import GetWebhookURL logger = logging.getLogger(__name__) class MessageSender: """消息发送器""" def __init__(self, webhook_url: Optional[str] = None): """初始化消息发送器 Args: webhook_url: Webhook URL """ if webhook_url is None: webhook_url = GetWebhookURL() self.webhook_url = webhook_url self.client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """获取HTTP客户端(懒加载)""" if self.client is None: self.client = httpx.AsyncClient(timeout=10.0) return self.client async def send_message(self, message: Dict[str, Any]) -> bool: """发送消息到WPS Args: message: 消息字典 Returns: 是否发送成功 """ try: client = await self._get_client() response = await client.post(self.webhook_url, json=message) if response.status_code == 200: logger.info(f"消息发送成功: {message.get('msgtype')}") return True else: logger.error(f"消息发送失败: status={response.status_code}, body={response.text}") return False except Exception as e: logger.error(f"发送消息异常: {e}", exc_info=True) return False async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool: """发送文本消息 Args: content: 文本内容 at_user_id: @用户ID(可选) Returns: 是否发送成功 """ # 如果需要@人 if at_user_id: content = f' {content}' message = { "msgtype": "text", "text": { "content": content } } return await self.send_message(message) async def send_markdown(self, text: str) -> bool: """发送Markdown消息 Args: text: Markdown文本 Returns: 是否发送成功 """ message = { "msgtype": "markdown", "markdown": { "text": text } } return await self.send_message(message) async def send_link(self, title: str, text: str, message_url: str = "", btn_title: str = "查看详情") -> bool: """发送链接消息 Args: title: 标题 text: 文本内容 message_url: 跳转URL btn_title: 按钮文字 Returns: 是否发送成功 """ message = { "msgtype": "link", "link": { "title": title, "text": text, "messageUrl": message_url, "btnTitle": btn_title } } return await self.send_message(message) async def close(self): """关闭HTTP客户端""" if self.client: await self.client.aclose() self.client = None # 全局消息发送器实例 _sender_instance: Optional[MessageSender] = None def get_message_sender() -> MessageSender: """获取全局消息发送器实例(单例模式)""" global _sender_instance if _sender_instance is None: _sender_instance = MessageSender() else: # 更新Webhook URL以确保使用最新的值 _sender_instance.webhook_url = GetWebhookURL() return _sender_instance async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool: """发送私聊消息到用户个人webhook URL Args: user_id: 目标用户ID content: 消息内容 msg_type: 消息类型 ('text' 或 'markdown') Returns: 是否发送成功,如果用户没有个人URL则返回False """ from core.database import get_db # 从数据库获取用户webhook URL db = get_db() webhook_url = db.get_user_webhook_url(user_id) if not webhook_url: logger.warning(f"用户 {user_id} 没有注册个人webhook URL,无法发送私聊消息") return False # 创建MessageSender实例(使用用户的个人URL) sender = MessageSender(webhook_url=webhook_url) try: # 根据msg_type调用相应方法 if msg_type == 'markdown': return await sender.send_markdown(content) else: return await sender.send_text(content) except Exception as e: logger.error(f"发送私聊消息失败: user_id={user_id}, error={e}", exc_info=True) return False finally: # 关闭HTTP客户端 await sender.close()