"""Garden service handling planting, harvesting, stealing, and selling.""" from __future__ import annotations import json import random from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from PWF.CoreModules.plugin_interface import PluginInterface from PWF.CoreModules.flags import get_internal_debug from pydantic import BaseModel from .garden_models import ( GARDEN_CROPS, GARDEN_FRUITS, GARDEN_MISC_ITEMS, GARDEN_TRAPS_DICT, GardenCropDefinition, GardenTrapDefinition, get_garden_db_models, ) Timestamp = str def _local_now() -> datetime: return datetime.now() def _parse_local_iso(ts: str) -> datetime: return datetime.fromisoformat(ts) class GardenConfig(BaseModel): max_plots: int sale_multiplier: int fortune_coeff: float theft_threshold_ratio: float seed_store_limit: int crops_config_path: str class Config: allow_mutation = False @classmethod def load(cls) -> "GardenConfig": project_config: ProjectConfig = Architecture.Get(ProjectConfig) max_plots = int(project_config.FindItem("garden_max_plots_per_user", 4)) sale_multiplier = int(project_config.FindItem("garden_sale_multiplier", 10)) fortune_coeff = float(project_config.FindItem("garden_fortune_coeff", 0.03)) theft_ratio = float(project_config.FindItem("garden_theft_threshold_ratio", 0.5)) seed_store_limit = int(project_config.FindItem("garden_seed_store_limit", 5)) crops_config_path = str(project_config.FindItem("garden_crops_config_path", "Plugins/garden_crops.json")) project_config.SaveProperties() return cls( max_plots=max_plots, sale_multiplier=sale_multiplier, fortune_coeff=fortune_coeff, theft_threshold_ratio=theft_ratio, seed_store_limit=seed_store_limit, crops_config_path=crops_config_path, ) class GardenService: def __init__(self) -> None: self._config = GardenConfig.load() self._db = get_db() self._logger: ProjectConfig = Architecture.Get(ProjectConfig) @property def config(self) -> GardenConfig: return self._config # region Query helpers def list_plots(self, user_id: int) -> List[Dict[str, object]]: cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM garden_plots WHERE user_id = ? ORDER BY plot_index ASC", (user_id,), ) rows = cursor.fetchall() return [dict(row) for row in rows] def get_plot(self, user_id: int, plot_index: int) -> Optional[Dict[str, object]]: cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) row = cursor.fetchone() return dict(row) if row else None # endregion # region Planting def plant( self, *, user_id: int, chat_id: int, seed_id: str, plot_index: Optional[int] = None, register_callback: Optional[ Tuple[PluginInterface, str] ] = None, ) -> Tuple[int, datetime]: crop = GARDEN_CROPS.get(seed_id) if not crop: raise ValueError("未知的种子") plots = self.list_plots(user_id) used_indices = {int(plot["plot_index"]) for plot in plots} if len(used_indices) >= self._config.max_plots: raise ValueError("没有空闲土地") if plot_index is None: for idx in range(1, self._config.max_plots + 1): if idx not in used_indices: plot_index = idx break if plot_index is None: raise ValueError("无法分配地块") planted_at = _local_now() mature_at = planted_at + timedelta(minutes=crop.growth_minutes) debug_mode = get_internal_debug() if debug_mode: mature_at = planted_at cursor = self._db.conn.cursor() cursor.execute( """ INSERT INTO garden_plots ( user_id, chat_id, plot_index, seed_id, seed_quality, planted_at, mature_at, is_mature, base_yield, extra_type, extra_payload, remaining_fruit, theft_users, scheduled_task_id ) VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?, NULL) """, ( user_id, chat_id, plot_index, crop.seed_id, crop.tier, planted_at.isoformat(), mature_at.isoformat(), crop.base_yield, crop.extra_reward.kind, json.dumps(crop.extra_reward.payload) if crop.extra_reward else None, crop.base_yield, json.dumps([]), ), ) self._db.conn.commit() task_id = None if register_callback: plugin, callback_name = register_callback delay_ms = 0 if debug_mode else int(crop.growth_minutes * 60 * 1000) task_id = plugin.register_clock( getattr(plugin, callback_name), delay_ms, kwargs={"user_id": user_id, "chat_id": chat_id, "plot_index": plot_index}, ) cursor.execute( "UPDATE garden_plots SET scheduled_task_id = ? WHERE user_id = ? AND plot_index = ?", (task_id, user_id, plot_index), ) self._db.conn.commit() return plot_index, mature_at.isoformat() def mark_mature(self, user_id: int, plot_index: int) -> None: cursor = self._db.conn.cursor() cursor.execute( "UPDATE garden_plots SET is_mature = 1, scheduled_task_id = NULL WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) self._db.conn.commit() # endregion # region Harvest def harvest(self, *, user_id: int, plot_index: int, fortune_value: float) -> Dict[str, object]: plot = self.get_plot(user_id, plot_index) if not plot: raise ValueError("指定地块不存在") if int(plot["is_mature"]) != 1: raise ValueError("作物尚未成熟") crop = GARDEN_CROPS.get(plot["seed_id"]) if not crop: raise ValueError("未知作物") initial_yield = int(plot["base_yield"]) remaining_fruit = int(plot["remaining_fruit"]) if remaining_fruit < 0: remaining_fruit = 0 extra_reward = None if crop.extra_reward: base_rate = crop.extra_reward.base_rate probability = max( 0.0, min(1.0, base_rate + fortune_value * self._config.fortune_coeff), ) if random.random() <= probability: if crop.extra_reward.kind == "points": data = crop.extra_reward.payload max_points = min( data.get("max", initial_yield * crop.seed_price), crop.seed_price * self._config.sale_multiplier, ) min_points = data.get("min", 0) if max_points > 0: amount = random.randint(min_points, max_points) extra_reward = {"type": "points", "amount": amount} elif crop.extra_reward.kind == "item" and crop.extra_item_id: data = crop.extra_reward.payload min_qty = max(0, data.get("min", 0)) max_qty = max(min_qty, data.get("max", min_qty)) if max_qty > 0: quantity = random.randint(min_qty, max_qty) extra_reward = { "type": "item", "item_id": crop.extra_item_id, "quantity": quantity, } result = { "crop": crop, "base_yield": remaining_fruit, "extra": extra_reward, } result["initial_yield"] = initial_yield cursor = self._db.conn.cursor() cursor.execute( "DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) self._db.conn.commit() return result def clear_plot(self, *, user_id: int, plot_index: int) -> bool: cursor = self._db.conn.cursor() cursor.execute( "DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) deleted = cursor.rowcount > 0 if deleted: self._db.conn.commit() return deleted # endregion # region Steal def _is_theft_banned(self, user_id: int) -> Tuple[bool, Optional[str]]: """检查用户是否被禁止偷盗 Returns: (是否被禁止, 如果被禁止则返回解封时间字符串,否则为None) """ cursor = self._db.conn.cursor() cursor.execute( "SELECT banned_until FROM garden_theft_ban WHERE user_id = ?", (user_id,), ) row = cursor.fetchone() if not row: return False, None banned_until_str = row["banned_until"] banned_until = _parse_local_iso(banned_until_str) now = _local_now() if banned_until > now: return True, banned_until_str else: # 已过期,删除记录 cursor.execute("DELETE FROM garden_theft_ban WHERE user_id = ?", (user_id,)) self._db.conn.commit() return False, None def _ban_theft(self, user_id: int, ban_hours: int) -> str: """禁止用户偷盗一定时间 Returns: 解封时间字符串 """ banned_until = _local_now() + timedelta(hours=ban_hours) banned_until_str = banned_until.isoformat() cursor = self._db.conn.cursor() cursor.execute( """ INSERT INTO garden_theft_ban (user_id, banned_until) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET banned_until = excluded.banned_until """, (user_id, banned_until_str), ) self._db.conn.commit() return banned_until_str def _check_trap(self, plot: Dict[str, object], thief_id: int) -> Optional[Dict[str, object]]: """检查并触发陷阱 Returns: 如果触发陷阱,返回陷阱信息字典;否则返回None """ trap_item_id = plot.get("trap_item_id") if not trap_item_id: return None # 检查陷阱耐久度 trap_durability = int(plot.get("trap_durability", 0)) if trap_durability <= 0: return None trap = GARDEN_TRAPS_DICT.get(trap_item_id) if not trap: return None # 检查触发概率 if random.random() > trap.trigger_rate: return None # 触发陷阱:设置禁令 banned_until_str = self._ban_theft(thief_id, trap.ban_hours) # 减少耐久度 new_durability = trap_durability - 1 user_id = int(plot["user_id"]) plot_index = int(plot["plot_index"]) cursor = self._db.conn.cursor() if new_durability <= 0: # 耐久度归零,移除陷阱 cursor.execute( """ UPDATE garden_plots SET trap_item_id = NULL, trap_config = NULL, trap_durability = 0 WHERE user_id = ? AND plot_index = ? """, (user_id, plot_index), ) else: # 更新耐久度 cursor.execute( """ UPDATE garden_plots SET trap_durability = ? WHERE user_id = ? AND plot_index = ? """, (new_durability, user_id, plot_index), ) self._db.conn.commit() return { "trap": trap, "fine_points": trap.fine_points, "ban_hours": trap.ban_hours, "banned_until": banned_until_str, "durability": new_durability, "durability_exhausted": new_durability <= 0, "trigger_message": trap.trigger_message.format( fine=trap.fine_points, hours=trap.ban_hours, ), } def steal(self, *, thief_id: int, owner_id: int, plot_index: int) -> Dict[str, object]: plot = self.get_plot(owner_id, plot_index) if not plot: raise ValueError("目标地块不存在") if int(plot["is_mature"]) != 1: raise ValueError("目标作物尚未成熟") # 检查是否被禁止偷盗 is_banned, banned_until_str = self._is_theft_banned(thief_id) if is_banned: banned_until = _parse_local_iso(banned_until_str) now = _local_now() remaining_hours = (banned_until - now).total_seconds() / 3600 raise ValueError(f"你已被禁止偷盗,解封时间:{self.format_display_time(banned_until_str)}(剩余约{int(remaining_hours)}小时)") crop = GARDEN_CROPS.get(plot["seed_id"]) if not crop: raise ValueError("未知作物") remaining = int(plot["remaining_fruit"]) threshold = int(round(int(plot["base_yield"]) * self._config.theft_threshold_ratio)) if remaining <= threshold: raise ValueError("果实剩余不足,无法偷取") theft_users = set(json.loads(plot["theft_users"])) if thief_id in theft_users: raise ValueError("你已经偷取过该作物") # 检查陷阱(在偷盗之前检查) trap_result = self._check_trap(plot, thief_id) theft_users.add(thief_id) remaining -= 1 cursor = self._db.conn.cursor() cursor.execute( """ UPDATE garden_plots SET remaining_fruit = ?, theft_users = ? WHERE user_id = ? AND plot_index = ? """, ( remaining, json.dumps(list(theft_users)), owner_id, plot_index, ), ) self._db.conn.commit() result = { "crop": crop, "stolen_quantity": 1, "remaining": remaining, "chat_id": plot["chat_id"], "trap_result": trap_result, } return result # endregion # region Trap def place_trap(self, *, user_id: int, plot_index: int, trap_item_id: str) -> None: """在地块上放置陷阱""" plot = self.get_plot(user_id, plot_index) if not plot: raise ValueError("目标地块不存在") trap = GARDEN_TRAPS_DICT.get(trap_item_id) if not trap: raise ValueError("未知陷阱物品") # 构建陷阱配置(JSON格式) trap_config = json.dumps({ "item_id": trap.item_id, "display_name": trap.display_name, "tier": trap.tier, }) cursor = self._db.conn.cursor() cursor.execute( """ UPDATE garden_plots SET trap_item_id = ?, trap_config = ?, trap_durability = ? WHERE user_id = ? AND plot_index = ? """, (trap_item_id, trap_config, trap.durability, user_id, plot_index), ) self._db.conn.commit() def remove_trap(self, *, user_id: int, plot_index: int) -> None: """移除地块上的陷阱""" plot = self.get_plot(user_id, plot_index) if not plot: raise ValueError("目标地块不存在") cursor = self._db.conn.cursor() cursor.execute( """ UPDATE garden_plots SET trap_item_id = NULL, trap_config = NULL, trap_durability = 0 WHERE user_id = ? AND plot_index = ? """, (user_id, plot_index), ) self._db.conn.commit() # endregion # region Selling def sell_fruit(self, *, user_id: int, fruit_id: str, quantity: int) -> Tuple[int, int]: crop = GARDEN_FRUITS.get(fruit_id) if not crop: raise ValueError("未知果实") if quantity <= 0: raise ValueError("数量必须大于0") price_per = crop.seed_price * self._config.sale_multiplier total_points = price_per * quantity return total_points, price_per # endregion # region Maintenance def recover_overdue_plots(self) -> None: cursor = self._db.conn.cursor() cursor.execute( "SELECT user_id, plot_index, mature_at, is_mature FROM garden_plots WHERE is_mature = 0", ) rows = cursor.fetchall() now = _local_now() updated = 0 for row in rows: mature_at = _parse_local_iso(row["mature_at"]) if mature_at <= now: self.mark_mature(row["user_id"], row["plot_index"]) updated += 1 if updated: self._logger.Log( "Info", f"{ConsoleFrontColor.GREEN}同步成熟地块 {updated} 个{ConsoleFrontColor.RESET}", ) # endregion # region Utilities def format_display_time(self, iso_ts: str) -> str: try: dt = _parse_local_iso(iso_ts) return dt.strftime("%Y年%m月%d日 %H时%M分") except Exception: return iso_ts # endregion __all__ = ["GardenService", "GardenConfig", "get_garden_db_models"]