from __future__ import annotations import asyncio import time from PWF.Convention.Runtime import Config from PWF.Convention.Runtime.Config import * from pydantic import BaseModel, Field from PWF.Convention.Runtime.EasySave import EasySave, EasySaveSetting from PWF.Convention.Runtime.File import ToolFile from PWF.Convention.Runtime.GlobalConfig import ProjectConfig from .WPSAPI import BasicWPSInterface, LimitStringLength, get_internal_debug, WPSAPI config = ProjectConfig() class UserConfigEntry(BaseModel): chat_id: int user_id: int data: Dict[str, Any] = Field(default_factory=dict) updated_at: int = Field(default_factory=lambda: int(time.time())) class UserConfigStore(BaseModel): entries: List[UserConfigEntry] = Field(default_factory=list) class WPSConfigPlugin(BasicWPSInterface): def __init__(self) -> None: super().__init__() self._lock = asyncio.Lock() self._store = UserConfigStore() self._entries: Dict[Tuple[int, int], UserConfigEntry] = {} self._setting = self._build_setting() self._load_store() @override def dependencies(self) -> List[Type]: return [WPSAPI] def _build_setting(self) -> ToolFile: #EasySaveSetting: data_file: ToolFile = config.GetFile("user_configs.easysave", False) data_file.TryCreateParentPath() if data_file.Exists() == False: data_file.SaveAsJson(self._store) return data_file @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: config.Log("Info", "WPSConfigPlugin 插件已加载") self.register_plugin("config") self.register_plugin("cfg") async def do_callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: tokens = [token.strip() for token in message.strip().split() if token.strip()] if not tokens: return self._help_message() action = tokens[0].lower() if action == "set" and len(tokens) >= 3: key = tokens[1].lower() value = " ".join(tokens[2:]).strip() return await self._handle_set(chat_id, user_id, key, value) if action == "get" and len(tokens) >= 2: key = tokens[1].lower() return await self._handle_get(chat_id, user_id, key) # if action in {"adjust", "add", "sub"} and len(tokens) >= 3: # key = tokens[1].lower() # delta_token = tokens[2] # reason = " ".join(tokens[3:]).strip() if len(tokens) > 3 else "" # signed_delta = self._parse_delta(action, delta_token) # if signed_delta is None: # return "❌ 积分变更失败: delta 需要是整数" # return await self._handle_adjust(chat_id, user_id, key, signed_delta, reason) return self._help_message() @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message) message = await self.do_callback(message, chat_id, user_id) return await self.send_markdown_message(message, chat_id, user_id) # def _parse_delta(self, action: str, token: str) -> Optional[int]: # try: # delta = int(token) # except ValueError: # return None # # if action == "add": # return abs(delta) # if action == "sub": # return -abs(delta) # return delta async def _handle_set(self, chat_id: int, user_id: int, key: str, value: str) -> str: if key == "user.name": if not value: return "❌ 用户名不能为空" if len(value) > 40: return "❌ 用户名长度不能超过40个字符" await self._set_value(chat_id, user_id, key, value) return f"✅ 已设置用户名为 {value}" if key == "user.url": if not value: return "❌ 个人URL不能为空" lowered = value.lower() if not (lowered.startswith("http://") or lowered.startswith("https://")): return "❌ 个人URL 必须以 http:// 或 https:// 开头" await self._set_value(chat_id, user_id, key, value) return "✅ 已更新个人URL" # if key == "user.point": # try: # # points = int(value) # return "❌ 无权限设置积分" # except ValueError: # return "❌ 积分必须是整数" # await self._set_value(chat_id, user_id, key, points) # return f"✅ 已将积分设置为 {points}" # return "❌ 未识别的配置键,请使用 user.name / user.url / user.point" return "❌ 未识别的配置键,请使用 user.name / user.url" async def _handle_get(self, chat_id: int, user_id: int, key: str) -> str: entry = self._get_entry(chat_id, user_id, create=False) if entry is None or key not in entry.data: if key == "user.point": return "当前积分: 0" return f"⚠️ 尚未设置 {key}" value = entry.data.get(key) if key == "user.point": return f"当前积分: {int(value)}" return f"当前 {key} = {value}" async def _handle_adjust(self, chat_id: int, user_id: int, key: str, delta: int, reason: str) -> str: if key != "user.point": return "❌ 仅支持调整 user.point" new_value = await self._adjust_points(chat_id, user_id, delta, reason) prefix = "增加" if delta >= 0 else "扣除" detail = f"{prefix} {abs(delta)}" if delta != 0 else "未变动" return f"✅ {detail},当前积分: {new_value}" async def _set_value(self, chat_id: int, user_id: int, key: str, value: Any) -> None: entry = self._get_entry(chat_id, user_id, create=True) entry.data[key] = value entry.updated_at = int(time.time()) await self._save_store() async def _adjust_points(self, chat_id: int, user_id: int, delta: int, reason: str) -> int: entry = self._get_entry(chat_id, user_id, create=True) current = self._coerce_int(entry.data.get("user.point", 0)) new_value = current + delta entry.data["user.point"] = new_value history: List[Dict[str, Any]] = entry.data.setdefault("user.point_history", []) # type: ignore[assignment] history.append({ "delta": delta, "reason": reason, "timestamp": int(time.time()), }) if len(history) > 100: entry.data["user.point_history"] = history[-100:] entry.updated_at = int(time.time()) await self._save_store() return new_value def _get_entry(self, chat_id: int, user_id: int, create: bool) -> Optional[UserConfigEntry]: key = (chat_id, user_id) if key in self._entries: return self._entries[key] if not create: return None entry = UserConfigEntry(chat_id=chat_id, user_id=user_id) self._entries[key] = entry self._store.entries.append(entry) return entry def _ignore_lock_save_store(self) -> None: self._store.entries = list(self._entries.values()) try: #EasySave.Write(self._store, setting=self._setting) self._setting.SaveAsJson(self._store.model_dump()) except Exception as exc: config.Log("Error", f"ConfigPlugin 保存失败: {exc}") raise async def _save_store(self) -> None: async with self._lock: self._ignore_lock_save_store() def _load_store(self) -> None: try: store = UserConfigStore.model_validate(self._setting.LoadAsJson()) self._store = store or UserConfigStore() except FileNotFoundError: self._store = UserConfigStore() except Exception as exc: config.Log("Error", f"{traceback.format_exc()}") self._store = UserConfigStore() self._entries = { (entry.chat_id, entry.user_id): entry for entry in self._store.entries } def _coerce_int(self, value: Any) -> int: try: return int(value) except (TypeError, ValueError): return 0 def get_user_name(self, chat_id: int, user_id: int) -> Optional[str]: entry = self._entries.get((chat_id, user_id)) if not entry: return None value = entry.data.get("user.name") return str(value) if value is not None else None def get_user_url(self, chat_id: int, user_id: int) -> Optional[str]: entry = self._entries.get((chat_id, user_id)) if not entry: return None value = entry.data.get("user.url") return str(value) if value is not None else None def get_user_points(self, chat_id: int, user_id: int) -> int: entry = self._entries.get((chat_id, user_id)) if not entry: return 0 return self._coerce_int(entry.data.get("user.point", 0)) async def adjust_user_points(self, chat_id: int, user_id: int, delta: int, reason: str = "") -> int: return await self._adjust_points(chat_id, user_id, delta, reason) def _help_message(self) -> str: return ( "🛠️ Config 命令帮助:\n" "config set user.name <用户名>\n" "config set user.url \n" #"config set user.point <整数>\n" "config get user.name|user.url|user.point\n" #"config adjust user.point <整数> [原因]\n" #"config add user.point <整数> [原因]\n" #"config sub user.point <整数> [原因]" ) config.SaveProperties() __all__ = ["WPSConfigPlugin"]