from PWF.Convention.Runtime.Config import * from PWF.CoreModules.plugin_interface import PluginInterface from PWF.CoreModules.flags import * from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ProjectConfig from PWF.Convention.Runtime.Web import ToolURL from PWF.Convention.Runtime.String import LimitStringLength from fastapi.responses import HTMLResponse from dataclasses import dataclass, field import httpx import re logger: ProjectConfig = Architecture.Get(ProjectConfig) MAIN_WEBHOOK_URL = logger.FindItem("main_webhook_url", "") logger.SaveProperties() class GuideEntry(TypedDict, total=False): """单条图鉴信息。""" title: str identifier: str description: str category: str metadata: Dict[str, str] icon: str badge: str links: Sequence[Dict[str, str]] tags: Sequence[str] details: Sequence[Union[str, Dict[str, Any]]] group: str @dataclass(frozen=True) class GuideSection: """图鉴章节。""" title: str entries: Sequence[GuideEntry] = field(default_factory=tuple) description: str = "" layout: str = "grid" section_id: str | None = None @dataclass(frozen=True) class GuidePage: """完整图鉴页面。""" title: str sections: Sequence[GuideSection] = field(default_factory=tuple) subtitle: str = "" metadata: Dict[str, str] = field(default_factory=dict) related_links: Dict[str, Sequence[Dict[str, str]]] = field(default_factory=dict) def render_markdown_page(page: GuidePage) -> str: """保留 Markdown 渲染(备用)。""" def _render_section(section: GuideSection) -> str: lines: List[str] = [f"## {section.title}"] if section.description: lines.append(section.description) if not section.entries: lines.append("> 暂无内容。") return "\n".join(lines) for entry in section.entries: title = entry.get("title", "未命名") identifier = entry.get("identifier") desc = entry.get("description", "") category = entry.get("category") metadata = entry.get("metadata", {}) bullet = f"- **{title}**" if identifier: bullet += f"|`{identifier}`" if category: bullet += f"|{category}" lines.append(bullet) if desc: lines.append(f" - {desc}") for meta_key, meta_value in metadata.items(): lines.append(f" - {meta_key}:{meta_value}") return "\n".join(lines) lines: List[str] = [f"# {page.title}"] if page.subtitle: lines.append(page.subtitle) if page.metadata: lines.append("") for key, value in page.metadata.items(): lines.append(f"- {key}:{value}") for section in page.sections: lines.append("") lines.append(_render_section(section)) return "\n".join(lines) def render_html_page(page: GuidePage) -> str: """渲染 Apple Store 风格的 HTML 页面。""" def escape(text: Optional[str]) -> str: if not text: return "" return ( text.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def render_metadata(metadata: Dict[str, str]) -> str: if not metadata: return "" cards = [] for key, value in metadata.items(): cards.append( f"""
{escape(key)}
{escape(value)}
""" ) return f'
{"".join(cards)}
' def render_links(links: Optional[Sequence[Dict[str, str]]]) -> str: if not links: return "" items = [] for link in links: href = escape(link.get("href", "#")) label = escape(link.get("label", "前往")) items.append(f'{label}') return "".join(items) def render_tags(tags: Optional[Sequence[str]]) -> str: if not tags: return "" chips = "".join(f'{escape(tag)}' for tag in tags) return f'
{chips}
' def render_details(details: Optional[Sequence[Union[str, Dict[str, Any]]]]) -> str: if not details: return "" blocks: List[str] = [] for detail in details: if isinstance(detail, str): blocks.append(f'

{escape(detail)}

') elif isinstance(detail, dict): kind = detail.get("type") if kind == "list": items = "".join( f'
  • {escape(str(item))}
  • ' for item in detail.get("items", []) ) blocks.append(f'') elif kind == "steps": items = "".join( f'
  • {idx+1}{escape(str(item))}
  • ' for idx, item in enumerate(detail.get("items", [])) ) blocks.append(f'
      {items}
    ') elif kind == "table": rows = [] for row in detail.get("rows", []): cols = "".join(f"{escape(str(col))}" for col in row) rows.append(f"{cols}") head = "" headers = detail.get("headers") if headers: head = "".join(f"{escape(str(col))}" for col in headers) head = f"{head}" blocks.append( f'{head}{"".join(rows)}
    ' ) if not blocks: return "" return f'
    {"".join(blocks)}
    ' def render_entry(entry: GuideEntry) -> str: icon = escape(entry.get("icon")) badge = escape(entry.get("badge")) title = escape(entry.get("title")) identifier = escape(entry.get("identifier")) description = escape(entry.get("description")) category = escape(entry.get("category")) metadata_items = [] for meta_key, meta_value in entry.get("metadata", {}).items(): metadata_items.append( f'
  • {escape(meta_key)}{escape(str(meta_value))}
  • ' ) metadata_html = "" if metadata_items: metadata_html = f'' identifier_html = f'{identifier}' if identifier else "" category_html = f'{category}' if category else "" badge_html = f'{badge}' if badge else "" icon_html = f'
    {icon}
    ' if icon else "" links_html = render_links(entry.get("links")) tags_html = render_tags(entry.get("tags")) details_html = render_details(entry.get("details")) group = escape(entry.get("group")) group_attr = f' data-group="{group}"' if group else "" return f"""
    {icon_html}

    {title}{badge_html}

    {description}

    {metadata_html} {tags_html} {details_html} {links_html}
    """ def render_section(section: GuideSection) -> str: layout_class = "entries-grid" if section.layout == "grid" else "entries-list" section_attr = f' id="{escape(section.section_id)}"' if section.section_id else "" cards = "".join(render_entry(entry) for entry in section.entries) description_html = ( f'

    {escape(section.description)}

    ' if section.description else "" ) if not cards: cards = '
    暂无内容
    ' return f"""

    {escape(section.title)}

    {description_html}
    {cards}
    """ def render_related(related: Dict[str, Sequence[Dict[str, str]]]) -> str: if not related: return "" blocks: List[str] = [] for label, links in related.items(): if not links: continue items = "".join( f'{escape(link.get("label", ""))}' for link in links ) blocks.append( f""" """ ) if not blocks: return "" return f'' sections_html = "".join(render_section(section) for section in page.sections) metadata_html = render_metadata(page.metadata) related_html = render_related(page.related_links) subtitle_html = f'

    {escape(page.subtitle)}

    ' if page.subtitle else "" return f""" {escape(page.title)}

    {escape(page.title)}

    {subtitle_html}
    {metadata_html} {related_html} {sections_html}
    """ class MessageSender: """消息发送器""" def __init__(self, webhook_url: str): """初始化消息发送器 Args: webhook_url: Webhook URL """ self.webhook_url = webhook_url self.client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """获取HTTP客户端(懒加载)""" if self.client is None: self.client = httpx.AsyncClient(timeout=10.0) return self.client async def send_message(self, message: Dict[str, Any]) -> bool: """发送消息到WPS Args: message: 消息字典 Returns: 是否发送成功 """ try: client = await self._get_client() response = await client.post(self.webhook_url, json=message) if response.status_code == 200: logger.Log("Info", f"消息发送成功: {message.get('msgtype')}") return True else: logger.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}") return False except Exception as e: logger.Log("Error", f"发送消息异常: {e}") return False async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool: """发送文本消息 Args: content: 文本内容 at_user_id: @用户ID(可选) Returns: 是否发送成功 """ # 如果需要@人 if at_user_id: content = f' {content}' message = { "msgtype": "text", "text": { "content": content } } return await self.send_message(message) async def send_markdown(self, text: str) -> bool: """发送Markdown消息 Args: text: Markdown文本 Returns: 是否发送成功 """ message = { "msgtype": "markdown", "markdown": { "text": text } } return await self.send_message(message) async def send_link(self, title: str, text: str, message_url: str = "", btn_title: str = "查看详情") -> bool: """发送链接消息 Args: title: 标题 text: 文本内容 message_url: 跳转URL btn_title: 按钮文字 Returns: 是否发送成功 """ message = { "msgtype": "link", "link": { "title": title, "text": text, "messageUrl": message_url, "btnTitle": btn_title } } return await self.send_message(message) async def close(self): """关闭HTTP客户端""" if self.client: await self.client.aclose() self.client = None class BasicWPSInterface(PluginInterface): user_id_to_username: Optional[Callable[[int], str]] = None @override def is_enable_plugin(self) -> bool: return False def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str: ''' 根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息 Args: message: 消息内容 chat_id: 聊天ID user_id: 用户ID Returns: Webhook URL ''' return "" def get_webhook_request(self, data:Any|None) -> None: pass def get_message_sender(self, webhook_url: str) -> MessageSender: return MessageSender(webhook_url) def get_message_sender_type(self) -> Literal["text", "markdown", "link"]: return "markdown" def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]: if type == "text": return self.get_message_sender(webhook_url).send_text elif type == "markdown": return self.get_message_sender(webhook_url).send_markdown elif type == "link": return self.get_message_sender(webhook_url).send_link else: raise ValueError(f"Invalid message sender type: {type}") def parse_message_after_at(self, message: str) -> str: ''' 已过时 ''' return message async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None: webhook_url = self.get_webhook_url(message, chat_id, user_id) if get_internal_debug(): logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Chat ID: {chat_id}, User ID: {user_id}") if webhook_url == "" or webhook_url == None: return None username: str = self.user_id_to_username(user_id) if self.user_id_to_username else "" send_message = f"""## {username} --- {message} """ result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(send_message) if get_internal_verbose(): logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}") return None @override async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: message = self.parse_message_after_at(message) if message == "": return None return await self.send_markdown_message(message, chat_id, user_id) class WPSAPI(BasicWPSInterface): """核心 WPS 插件基类,提供图鉴模板设施。""" guide_section_labels: Dict[str, str] = { "commands": "指令一览", "items": "物品与资源", "recipes": "配方与合成", "guides": "系统指引", } def get_guide_title(self) -> str: return self.__class__.__name__ def get_guide_subtitle(self) -> str: return "" def get_guide_metadata(self) -> Dict[str, str]: return {} def collect_command_entries(self) -> Sequence[GuideEntry]: return () def collect_item_entries(self) -> Sequence[GuideEntry]: return () def collect_recipe_entries(self) -> Sequence[GuideEntry]: return () def collect_guide_entries(self) -> Sequence[GuideEntry]: return () def collect_additional_sections(self) -> Sequence[GuideSection]: return () def collect_guide_sections(self) -> Sequence[GuideSection]: sections: List[GuideSection] = [] command_entries = tuple(self.collect_command_entries()) if command_entries: sections.append( GuideSection( title=self.guide_section_labels["commands"], entries=command_entries, layout="list", ) ) item_entries = tuple(self.collect_item_entries()) if item_entries: sections.append( GuideSection( title=self.guide_section_labels["items"], entries=item_entries, ) ) recipe_entries = tuple(self.collect_recipe_entries()) if recipe_entries: sections.append( GuideSection( title=self.guide_section_labels["recipes"], entries=recipe_entries, ) ) guide_entries = tuple(self.collect_guide_entries()) if guide_entries: sections.append( GuideSection( title=self.guide_section_labels["guides"], entries=guide_entries, layout="list", ) ) additional_sections = tuple(self.collect_additional_sections()) if additional_sections: sections.extend(additional_sections) return tuple(sections) def build_guide_page(self) -> GuidePage: metadata: Dict[str, str] = {} for key, value in self.get_guide_metadata().items(): metadata[key] = str(value) related = self.get_related_links() return GuidePage( title=self.get_guide_title(), subtitle=self.get_guide_subtitle(), sections=self.collect_guide_sections(), metadata=metadata, related_links=related, ) def render_guide_page(self, page: GuidePage) -> str: return render_html_page(page) def render_guide(self) -> str: return self.render_guide_page(self.build_guide_page()) def get_guide_response(self, content: str) -> HTMLResponse: return HTMLResponse(content) @override def generate_router_illustrated_guide(self): async def handler() -> HTMLResponse: return self.get_guide_response(self.render_guide()) return handler def get_related_links(self) -> Dict[str, Sequence[Dict[str, str]]]: links: Dict[str, Sequence[Dict[str, str]]] = {} parents = [] for base in self.__class__.__mro__[1:]: if not issubclass(base, WPSAPI): continue if base.__module__.startswith("Plugins."): parents.append(base) if base is WPSAPI: break if parents: parents_links = [self._build_class_link(cls) for cls in reversed(parents)] links["父类链"] = tuple(filter(None, parents_links)) child_links = [self._build_class_link(child) for child in self._iter_subclasses(self.__class__)] child_links = [link for link in child_links if link] if child_links: links["子类"] = tuple(child_links) return links def _build_class_link(self, cls: type) -> Optional[Dict[str, str]]: if not hasattr(cls, "__module__") or not cls.__module__.startswith("Plugins."): return None path = f"/api/{cls.__name__}" return { "label": cls.__name__, "href": path, } def _iter_subclasses(self, cls: type) -> List[type]: collected: List[type] = [] for subclass in cls.__subclasses__(): if not issubclass(subclass, WPSAPI): continue if not subclass.__module__.startswith("Plugins."): continue collected.append(subclass) collected.extend(self._iter_subclasses(subclass)) return collected def get_guide_subtitle(self) -> str: return "核心 Webhook 转发插件" def get_guide_metadata(self) -> Dict[str, str]: return { "Webhook 状态": "已配置" if MAIN_WEBHOOK_URL else "未配置", } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "say", "identifier": "say", "description": "将后续消息内容以 Markdown 形式发送到主 Webhook。", "metadata": {"别名": "说"}, "icon": "🗣️", "badge": "核心", }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "Webhook 绑定", "description": ( "在项目配置中设置 `main_webhook_url` 后插件自动启用," "所有注册的命令将调用 `send_markdown_message` 发送富文本。" ), "icon": "🔗", }, { "title": "消息格式", "description": ( "默认使用 Markdown 模式发送,支持 `聊天ID` 与 `用户ID` 的 @ 提醒。" ), "icon": "📝", }, ) @override def is_enable_plugin(self) -> bool: if MAIN_WEBHOOK_URL == "": logger.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}") return MAIN_WEBHOOK_URL != "" def get_main_webhook_url(self) -> str: return MAIN_WEBHOOK_URL @override def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str: webhook_url = ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).LoadAsText() if webhook_url == "": webhook_url = self.get_main_webhook_url() return webhook_url @override def get_webhook_request(self, data:Any|None) -> None: return None @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("say") self.register_plugin("说") class WPSAPIHelp(WPSAPI): @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: mapper: Dict[str, List[str]] = {} for key in PluginInterface.plugin_instances.keys(): plugin = PluginInterface.plugin_instances.get(key) if plugin: if plugin.__class__.__name__ not in mapper: mapper[plugin.__class__.__name__] = [] mapper[plugin.__class__.__name__].append(key) desc = "" for plugin_name, keys in mapper.items(): desc += f"- {plugin_name}: {", ".join(keys)}\n" return await self.send_markdown_message(f"""# 指令入口 {desc} """, chat_id, user_id) @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIHelp 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("help") self.register_plugin("帮助") class WPSAPIWebhook(WPSAPI): @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIWebhook 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("chat_url_register") self.register_plugin("会话注册") @override async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).SaveAsText(message) return await self.send_markdown_message(f"会话注册成功", chat_id, user_id) logger.SaveProperties()