from __future__ import annotations import hashlib from datetime import datetime from functools import lru_cache from typing import Any, Dict, List, Tuple, Type, override from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from .WPSAPI import WPSAPI logger: ProjectConfig = Architecture.Get(ProjectConfig) _HASH_BYTES = 16 _HASH_MAX = (1 << (_HASH_BYTES * 8)) - 1 _FORTUNE_STAGE_TABLE: List[Tuple[float, str]] = [ (-0.9, "厄运深谷"), (-0.7, "凶多吉少"), (-0.5, "多有波折"), (-0.3, "略显低迷"), (-0.1, "风平浪静"), (0.1, "小有起色"), (0.3, "渐入佳境"), (0.5, "好运上扬"), (0.7, "顺风顺水"), (0.9, "鸿运当头"), (1.01, "天命所归"), ] class WPSFortuneSystem(WPSAPI): """基于整点哈希的运势系统,可供其他模块复用""" @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSFortuneSystem 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("运势") self.register_plugin("fortune") @override async def callback(self, message: str, chat_id: int, user_id: int) -> str | None: fortune_message = self._format_fortune_message(user_id) return await self.send_markdown_message(fortune_message, chat_id, user_id) def get_fortune_value(self, user_id: int, dt: datetime | None = None) -> float: hour_dt, hour_key = self._resolve_hour(dt) return self._compute_fortune_value(user_id, hour_key) def get_fortune_stage(self, user_id: int, dt: datetime | None = None) -> str: value = self.get_fortune_value(user_id, dt) return self._match_stage(value) def get_fortune_info(self, user_id: int, dt: datetime | None = None) -> Dict[str, Any]: hour_dt, hour_key = self._resolve_hour(dt) value = self._compute_fortune_value(user_id, hour_key) stage = self._match_stage(value) return { "value": value, "stage": stage, "hour_key": hour_key, "hour_label": hour_dt.strftime("%Y-%m-%d %H:00"), "timestamp": hour_dt.isoformat(), } def _resolve_hour(self, dt: datetime | None) -> Tuple[datetime, str]: target_dt = dt or datetime.now() hour_dt = target_dt.replace(minute=0, second=0, microsecond=0) return hour_dt, hour_dt.isoformat() def _format_fortune_message(self, user_id: int) -> str: info = self.get_fortune_info(user_id) value_display = f"{info['value']:.4f}" return ( "# 🎲 运势占卜\n" f"- 运势值:`{value_display}`\n" f"- 运势阶段:{info['stage']}\n" f"- 基准整点:{info['hour_label']}\n" "> 运势每整点刷新,允许运气加成的系统会复用同一结果。" ) def _match_stage(self, value: float) -> str: for upper_bound, label in _FORTUNE_STAGE_TABLE: if value < upper_bound: return label return _FORTUNE_STAGE_TABLE[-1][1] @staticmethod @lru_cache(maxsize=2048) def _cached_hash_value(user_id: int, hour_key: str) -> float: payload = f"{user_id}:{hour_key}".encode("utf-8") digest = hashlib.sha256(payload).digest()[:_HASH_BYTES] integer = int.from_bytes(digest, "big") normalized = integer / _HASH_MAX if _HASH_MAX else 0.0 mapped = normalized * 2 - 1 return max(-0.9999, min(0.9999, mapped)) def _compute_fortune_value(self, user_id: int, hour_key: str) -> float: try: return self._cached_hash_value(user_id, hour_key) except Exception as exc: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}计算运势时出现异常: {exc}{ConsoleFrontColor.RESET}", ) return 0.0 __all__ = ["WPSFortuneSystem"]