diff --git a/Convention/Runtime/Config.md b/Convention/Runtime/Config.md deleted file mode 100644 index 8a2c2ca..0000000 --- a/Convention/Runtime/Config.md +++ /dev/null @@ -1,49 +0,0 @@ -[返回](./Runtime-README.md) - -# /Convention/Runtime/Config - ---- - -包含了关于静态的配置信息等内容, 并且引入全体标准库内容 - -## Import All - -检查并尝试引入所有依赖库 - -## 静态配置 - -若不存在相应配置, 则需要定义 - -- `CURRENT_COM_NAME` 公司/组织名称 -- `CURRENT_APP_NAME` 应用名称 - -## PlatformIndicator包含的内容 - -- `IsRelease` Debug/Release状态 -- `IsPlatform` 平台判断 -- `IsPlatformX64`平台架构判断 -- 编译器/解释器判断 如`IsMSVC`等 -- `ApplicationPath` 获取当前应用程序目录 -- `StreamingAssetsPath` 获取StreamingAssets目录 -- `PersistentPath` 获取持久化目录 -- `PlatformInfomation` 平台相关的发布信息 - -## 多个静态类 Indicator - -包含对应类型常用的工具函数 - -## 静态类 DescriptiveIndicator - -包含一个`描述`字符串, 可选一个`值`对象 - -## 其他基础内容 - -用于对齐不同语言间基本实现的颗粒度, 如以下内容 - -- 类型转换 -- 字符串操作 -- Construct/Destruct 重构造/析构 -- 命令行解析 -- 简单的反射内容 -- 元类型 -- 等等... diff --git a/Convention/Runtime/Config.py b/Convention/Runtime/Config.py index 528d553..5fcfe88 100644 --- a/Convention/Runtime/Config.py +++ b/Convention/Runtime/Config.py @@ -121,13 +121,13 @@ class ActionEvent[_Call:Callable]: return ex def CallFunc(self, index:int, *args, **kwargs) -> Union[Any, Exception]: return self.CallFuncWithoutCallIndexControl(self.call_indexs[index], *args, **kwargs) - def _inject_invoke(self, *args, **kwargs): + def _InjectInvoke(self, *args, **kwargs): result:List[Any] = [] - for index in range(self.call_max_count): + for index in range(self.CallMaxCount): result.append(self.CallFunc(index, *args, **kwargs)) return result def Invoke(self, *args, **kwargs) -> Union[Self, bool]: - self.last_result = self._inject_invoke(*args, **kwargs) + self.last_result = self._InjectInvoke(*args, **kwargs) return self def InitCallIndex(self): self.call_indexs = [i for i in range(len(self.__actions))] @@ -247,6 +247,10 @@ class lock_guard: self._locker.acquire() def __del__(self): self._locker.release() + def __enter__(self): + return + def __exit__(self,*args,**kwargs): + return True class global_lock_guard(lock_guard): def __init__(self): @@ -276,3 +280,63 @@ def Nowf() -> str: return: str ''' return datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + +true: Literal[True] = True +false: Literal[False] = False + +class PlatformIndicator: + IsRelease : bool = False + IsPlatformWindows : bool = sys.platform == "Windows" + IsPlatformLinux : bool = sys.platform == "Linux" + IsPlatformOsx : bool = sys.platform == "OSX" + IsPlatformX64 : bool = True + CompanyName : str = "DefaultCompany" + ProductName : str = "DefaultProject" + + @staticmethod + def GetApplicationPath() -> str: + """获取应用程序所在目录""" + import os + return os.path.dirname(os.path.abspath(__file__)) + + @staticmethod + def GetCurrentWorkingDirectory() -> str: + """获取当前工作目录""" + import os + return os.getcwd() + + # 使用类方法获取路径 + @classmethod + def ApplicationPath(cls) -> str: + """应用程序路径属性""" + return cls.GetApplicationPath() + + @classmethod + def StreamingAssetsPath(cls) -> str: + """流媒体资源路径属性""" + return cls.ApplicationPath() + "/StreamingAssets/" + + @staticmethod + def PersistentDataPath() -> str: + """ + 获取持久化数据路径,根据平台返回不同的路径 + """ + import os + if PlatformIndicator.IsPlatformWindows: + return os.path.expandvars(f"%userprofile%\\AppData\\LocalLow\\{PlatformIndicator.CompanyName}\\{PlatformIndicator.ProductName}\\") + elif PlatformIndicator.IsPlatformLinux: + return os.path.expandvars("$HOME/.config/") + return "" + + @staticmethod + def DataPath() -> str: + """ + 获取数据路径 + """ + return "Assets/" + +class DescriptiveIndicator[T]: + def __init__(self, description:str, value:T) -> None: + self.descripion : str = description + self.value : T = value + diff --git a/Convention/Runtime/EasySave.py b/Convention/Runtime/EasySave.py new file mode 100644 index 0000000..b85ed7f --- /dev/null +++ b/Convention/Runtime/EasySave.py @@ -0,0 +1,408 @@ +from .Reflection import * +from .File import ToolFile +from .String import LimitStringLength + +_Internal_EasySave_Debug:bool = False +def GetInternalEasySaveDebug() -> bool: + return _Internal_EasySave_Debug and GetInternalDebug() +def SetInternalEasySaveDebug(debug:bool) -> None: + global _Internal_EasySave_Debug + _Internal_EasySave_Debug = debug + +class EasySaveSetting(BaseModel, any_class): + key: str = Field(description="目标键", default="easy") + # 从目标文件进行序列化/反序列化 + file: str = Field(description="目标文件") + # 序列化/反序列化的格式方法 + format: Literal["json", "binary"] = Field(description="保存模式", default="json") + # TODO: refChain: bool = Field(description="是否以保留引用的方式保存", default=True) + # 文件形式与参数 + # TODO: encoding: str = Field(description="编码", default="utf-8") + is_backup: bool = Field(description="是否备份", default=True) + backup_suffix: str = Field(description="备份后缀", default=".backup") + # 序列化/反序列化时, 如果设置了忽略字段的谓词, 则被谓词选中的字段将不会工作 + # 如果设置了选择字段的谓词, 则被选中的字段才会工作 + ignore_pr: Optional[Callable[[FieldInfo], bool]] = Field(description="忽略字段的谓词", default=None) + select_pr: Optional[Callable[[FieldInfo], bool]] = Field(description="选择字段的谓词", default=None) + +class ESWriter(BaseModel, any_class): + setting: EasySaveSetting = Field(description="设置") + + @sealed + def _GetFields(self, rtype:RefType) -> List[FieldInfo]: + ''' + 获取字段 + ''' + fields: List[FieldInfo] = [] + if self.setting.ignore_pr is not None and self.setting.select_pr is not None: + fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) and not self.setting.ignore_pr(field) ] + elif self.setting.select_pr is None and self.setting.ignore_pr is None: + fields = rtype.GetFields() + elif self.setting.ignore_pr is not None: + fields = [ field for field in rtype.GetAllFields() if not self.setting.ignore_pr(field) ] + else: + fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) ] + return fields + + @sealed + def _DoJsonSerialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any: + ''' + 序列化: json格式 + ''' + + def dfs(rtype:RefType, rinstance:Any) -> Dict[str, Any]|Any: + if rinstance is None: + return rinstance + + if rtype.IsUnion: + rtype = TypeManager.GetInstance().CreateOrGetRefType(rinstance) + + if rtype.IsValueType: + return rinstance + elif rtype.IsCollection: + try: + if rtype.IsList: + return [ dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance ] + elif rtype.IsSet: + return { dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance } + elif rtype.IsTuple: + return tuple(dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance) + elif rtype.IsDictionary: + return { + dfs(TypeManager.GetInstance().CreateOrGetRefType(key), key): + dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) + for key, iter_ in rinstance.items() + } + except Exception as e: + raise ReflectionException(f"{ConsoleFrontColor.RED}容器<{rtype.RealType}>"\ + f"在序列化时遇到错误:{ConsoleFrontColor.RESET}\n{e}") from e + raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {rinstance}"\ + f"<{rtype.print_str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") + elif hasattr(rtype.RealType, "__easy_serialize__"): + custom_data, is_need_type = rtype.RealType.__easy_serialize__(rinstance) + if is_need_type: + return { + "__type": AssemblyTypen(rtype.RealType), + **custom_data + } + else: + return custom_data + else: + fields: List[FieldInfo] = self._GetFields(rtype) + layer: Dict[str, Any] = { + "__type": AssemblyTypen(rtype.RealType) + } + for field in fields: + try: + layer[field.FieldName] = dfs( + TypeManager.GetInstance().CreateOrGetRefType(field.FieldType), + field.GetValue(rinstance) + ) + except Exception as e: + raise ReflectionException(f"{ConsoleFrontColor.RED}字段{field.FieldName}"\ + f"<{field.FieldType}>在序列化时遇到错误:{ConsoleFrontColor.RESET}\n{e}") from e + return layer + + layers: Dict[str, Any] = {} + if result_file.exists(): + filedata = result_file.load() + if isinstance(filedata, dict): + layers = filedata + layers[self.setting.key] = { + "__type": AssemblyTypen(rtype.RealType), + "value": dfs(rtype, rinstance) + } + result_file.data = layers + result_file.save_as_json() + + @sealed + def _DoBinarySerialize(self, result_file:ToolFile, rinstance:Any) -> Any: + ''' + 序列化: 二进制格式 + ''' + result_file.data = rinstance + result_file.save_as_binary() + + @virtual + def Serialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any: + ''' + 序列化 + ''' + if self.setting.format == "json": + self._DoJsonSerialize(result_file, rtype, rinstance) + elif self.setting.format == "binary": + self._DoBinarySerialize(result_file, rinstance) + else: + raise NotImplementedError(f"不支持的格式: {self.setting.format}") + + @virtual + def Write[T](self, rinstance:T) -> ToolFile: + ''' + 写入数据 + ''' + result_file: ToolFile = ToolFile(self.setting.file) + backup_file: ToolFile = None + if result_file.dirpath is not None and not ToolFile(result_file.dirpath).exists(): + raise FileNotFoundError(f"文件路径不存在: {result_file.dirpath}") + if result_file.exists() and self.setting.is_backup: + if result_file.dirpath is not None: + backup_file = result_file.dirpath | (result_file.get_filename(True) + self.setting.backup_suffix) + else: + backup_file = ToolFile(result_file.get_filename(True) + self.setting.backup_suffix) + result_file.copy(backup_file) + try: + self.Serialize(result_file, TypeManager.GetInstance().CreateOrGetRefType(rinstance), rinstance) + except Exception: + if backup_file is not None: + result_file.remove() + backup_file.copy(result_file) + backup_file.remove() + raise + finally: + if backup_file is not None: + backup_file.remove() + return result_file + +class ESReader(BaseModel, any_class): + setting: EasySaveSetting = Field(description="设置") + + @sealed + def _GetFields(self, rtype:RefType) -> List[FieldInfo]: + ''' + 获取字段 + ''' + fields: List[FieldInfo] = [] + if self.setting.ignore_pr is not None and self.setting.select_pr is not None: + fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) and not self.setting.ignore_pr(field) ] + elif self.setting.select_pr is None and self.setting.ignore_pr is None: + fields = rtype.GetFields() + elif self.setting.ignore_pr is not None: + fields = [ field for field in rtype.GetAllFields() if not self.setting.ignore_pr(field) ] + else: + fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) ] + return fields + + def get_rtype_from_typen(self, type_label:str) -> RefType: + ''' + 从类型标签中获取类型 + ''' + #module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.') + #if GetInternalEasySaveDebug(): + # print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ + # f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\ + # f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}") + #typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name) + #return TypeManager.GetInstance().CreateOrGetRefType(typen_to) + typen, assembly_name = ReadAssemblyTypen(type_label) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ + f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\ + f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}") + return TypeManager.GetInstance().CreateOrGetRefType(typen) + + @sealed + def _DoJsonDeserialize(self, read_file:ToolFile, rtype:Optional[RefType] = None) -> Any: + ''' + 反序列化: json格式 + + Args: + read_file (ToolFile): 要读取的文件对象 + rtype (Optional[RTypen[Any]], optional): 目标类型. 如果为None, 则从文件中读取类型信息. Defaults to None. + + Returns: + Any: 反序列化后的对象 + + Raises: + NotImplementedError: 当遇到不支持的集合类型时抛出 + ValueError: 当rinstance不为None时抛出 + ''' + # 从文件中加载JSON数据 + layers: Dict[str, Any] = read_file.load_as_json() + if self.setting.key not in layers: + raise ValueError(f"{ConsoleFrontColor.RED}文件中不包含目标键: {ConsoleFrontColor.RESET}{self.setting.key}") + # 如果未指定类型, 则从JSON数据中获取类型信息 + if rtype is None: + rtype: RefType = self.get_rtype_from_typen(layers["__type"]) + layers: Dict[str, Any] = layers[self.setting.key]["value"] + result_instance: Any = None + + def dfs(rtype:Optional[RefType], layer:Dict[str, Any]|Any) -> Any: + ''' + 深度优先遍历反序列化 + + Args: + rtype (Optional[RefType]): 当前处理的类型 + layer (Dict[str, Any]|Any): 当前处理的JSON数据层 + rinstance (Any): 当前处理的对象实例 + + Returns: + Any: 反序列化后的对象 + ''' + # 如果类型为None且当前层包含类型信息, 则获取类型 + if isinstance(layer, dict) and "__type" in layer: + rtype = self.get_rtype_from_typen(layer["__type"]) + if rtype is None: + raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}") + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\ + f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}") + + # 处理值类型 + if (rtype.IsValueType or + rtype.Verify(Any) or + (layer is None and rtype.Verify(type(None))) + ): + return layer + # 处理集合类型 + elif rtype.IsCollection: + try: + if rtype.IsList: + element_type = rtype.GenericArgs[0] if len(rtype.GenericArgs) > 0 else Any + return [ dfs(TypeManager.GetInstance().CreateOrGetRefType(element_type), iter_) for iter_ in layer ] + elif rtype.IsSet: + element_type = rtype.GenericArgs[0] if len(rtype.GenericArgs) > 0 else Any + return { dfs(TypeManager.GetInstance().CreateOrGetRefType(element_type), iter_) for iter_ in layer } + elif rtype.IsTuple: + element_types: List[type] = rtype.GenericArgs + result: tuple = tuple(None for _ in layer) + if element_types is None or len(element_types) == 0: + element_types = [Any] * len(layer) + for index, iter_ in enumerate(layer): + result[index] = dfs(TypeManager.GetInstance().CreateOrGetRefType(element_types[index]), iter_) + return result + elif rtype.IsDictionary: + element_key, element_value = (rtype.GenericArgs[0], rtype.GenericArgs[1]) if len(rtype.GenericArgs) > 1 else (Any, Any) + return { + dfs(TypeManager.GetInstance().CreateOrGetRefType(element_key), keyname): + dfs(TypeManager.GetInstance().CreateOrGetRefType(element_value), iter_) + for keyname, iter_ in layer.items() + } + except Exception as e: + raise ReflectionException(f"容器<{LimitStringLength(str(layer), 100)}>在反序列化时遇到错误:\n{e}") from e + raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {LimitStringLength(str(layer), 100)}"\ + f"<{rtype.print_str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") + # 处理对象类型 + elif isinstance(rtype.RealType, type) and hasattr(rtype.RealType, "__easy_deserialize__"): + return rtype.RealType.__easy_deserialize__(layer) + else: + rinstance = rtype.CreateInstance() + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\ + f"{rtype.print_str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") + fields:List[FieldInfo] = self._GetFields(rtype) + for field in fields: + if field.FieldName not in layer: + continue + field_rtype:RefType = None + try: + if field.FieldType == list and field.ValueType.IsGeneric: + field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0])) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ + f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\ + f"{field_rtype.GenericArgs[0]}>") + elif field.FieldType == set and field.ValueType.IsGeneric: + field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0])) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ + f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\ + f"{field_rtype.GenericArgs[0]}>") + elif field.FieldType == tuple and field.ValueType.IsGeneric: + field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0])) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ + f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\ + f"{field_rtype.GenericArgs[0]}>") + elif field.FieldType == dict and field.ValueType.IsGeneric: + field_rtype = TypeManager.GetInstance().CreateOrGetRefType( + DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1]) + ) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ + f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\ + f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>") + else: + field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType) + if GetInternalEasySaveDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ + f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\ + f"<{field_rtype.GenericArgs}>") + field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName])) + except Exception as e: + raise ReflectionException(f"Json字段{field.FieldName}={LimitStringLength(str(layer[field.FieldName]), 100)}: \n{e}") from e + return rinstance + + # 从根节点开始反序列化 + result_instance = dfs(rtype, layers) + return result_instance + + @sealed + def _DoBinaryDeserialize(self, read_file:ToolFile, rtype:RefType) -> Any: + ''' + 反序列化: 二进制格式 + ''' + return read_file.load_as_binary() + + @virtual + def Deserialize(self, read_file:ToolFile, rtype:Optional[RefType]=None) -> Any: + ''' + 反序列化 + ''' + if self.setting.format == "json": + return self._DoJsonDeserialize(read_file, rtype) + elif self.setting.format == "binary": + return self._DoBinaryDeserialize(read_file, rtype) + else: + raise NotImplementedError(f"不支持的格式: {self.setting.format}") + + @virtual + def Read[T](self, rtype:Optional[RTypen[T]]=None) -> T: + ''' + 读取数据 + ''' + read_file: ToolFile = ToolFile(self.setting.file) + if not read_file.exists(): + raise FileNotFoundError(f"文件不存在: {read_file}") + if read_file.is_dir(): + raise IsADirectoryError(f"文件是目录: {read_file}") + return self.Deserialize(read_file, rtype) + +class EasySave(any_class): + @staticmethod + def Write[T](rinstance:T, file:tool_file_or_str=None, *, setting:Optional[EasySaveSetting]=None) -> ToolFile: + ''' + 写入数据 + ''' + return ESWriter(setting=(setting if setting is not None else EasySaveSetting(file=UnwrapperFile2Str(file)))).Write(rinstance) + + @overload + @staticmethod + def Read[T]( + rtype: Typen[T], + file: tool_file_or_str = None, + *, + setting: Optional[EasySaveSetting] = None + ) -> T: + ... + @overload + @staticmethod + def Read[T]( + rtype: RTypen[T], + file: tool_file_or_str = None, + *, + setting: Optional[EasySaveSetting] = None + ) -> T: + ... + @staticmethod + def Read[T]( + rtype: RTypen[T]|type, + file: tool_file_or_str = None, + *, + setting: Optional[EasySaveSetting] = None + ) -> T: + ''' + 读取数据 + ''' + if isinstance(rtype, type): + rtype = TypeManager.GetInstance().CreateOrGetRefType(rtype) + return ESReader(setting=(setting if setting is not None else EasySaveSetting(file=UnwrapperFile2Str(file)))).Read(rtype) diff --git a/Convention/Runtime/File.py b/Convention/Runtime/File.py new file mode 100644 index 0000000..07f0cb9 --- /dev/null +++ b/Convention/Runtime/File.py @@ -0,0 +1,1290 @@ +from .Config import * +import json +import shutil +import pandas as pd +import os +import sys +import pickle +import zipfile +import tarfile +import base64 +import hashlib +import time +import datetime +import stat +from typing import * +from pathlib import Path +try: + from pydub import AudioSegment +except ImportError: + ImportingThrow("File", ["pydub"]) +try: + from PIL import Image, ImageFile +except ImportError: + ImportingThrow("File", ["Pillow"]) +try: + from docx import Document + from docx.document import Document as DocumentObject +except ImportError: + ImportingThrow("File", ["python-docx"]) + +from .String import Bytes2String + +def get_extension_name(file:str): + return os.path.splitext(file)[1][1:] + +def get_base_filename(file:str): + return os.path.basename(file) + +dir_name_type = str +file_name_type = str + +class FileOperationError(Exception): + """文件操作异常基类""" + pass + +class CompressionError(FileOperationError): + """压缩操作异常""" + pass + +class EncryptionError(FileOperationError): + """加密操作异常""" + pass + +class HashError(FileOperationError): + """哈希计算异常""" + pass + +class FileMonitorError(FileOperationError): + """文件监控异常""" + pass + +class BackupError(FileOperationError): + """备份操作异常""" + pass + +class PermissionError(FileOperationError): + """权限操作异常""" + pass + +try: + from pydantic import BaseModel, GetCoreSchemaHandler + from pydantic_core import core_schema +except ImportError: + class BaseModel: + def __init__(self,*args,**kwargs)->None: + pass + type GetCoreSchemaHandler = Any + type core_schema = Any + +class ToolFile(BaseModel): + OriginFullPath:str + + @classmethod + def __get_pydantic_core_schema__( + cls, + _source_type: Any, + _handler: GetCoreSchemaHandler, + ) -> core_schema.CoreSchema: + return core_schema.no_info_after_validator_function( + cls, + core_schema.any_schema(), + serialization=core_schema.plain_serializer_function_ser_schema( + lambda instance: None + ), + ) + + + def __init__( + self, + filePath: Union[str, Self], + ): + self.OriginFullPath = filePath if isinstance(filePath, str) else filePath.OriginFullPath + def __del__(self): + self.close() + def __str__(self): + return self.GetFullPath() + def __setitem__(self, key:str, value): + self.datas[key] = value + def __getitem__(self, key:str): + if key not in self.datas: + self.datas[key] = None + return self.datas[key] + def __contains__(self, key:str): + return key in self.datas + def __delitem__(self, key:str): + del self.datas[key] + def __iter__(self): + return iter(self.datas) + def __len__(self): + return len(self.datas) + @override + def __enter__(self): + if self.is_open(): + return self + if self.exists() and self.is_file(): + self.load() + return self + @override + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return super().__exit__(exc_type, exc_val, exc_tb) + + def __or__(self, other): + if other is None: + return ToolFile(self.GetFullPath() if self.is_dir() else self.GetFullPath()+"\\") + else: + return ToolFile(os.path.join(self.GetFullPath(), UnWrapper(other))) + def __idiv__(self, other): + self.close() + temp = self.__or__(other) + self.OriginFullPath = temp.GetFullPath() + + def __eq__(self, other) -> bool: + """ + 判断文件路径是否相等 + 注意字符串可能不同,因为文件夹路径后缀的斜线可能被忽略 + + Args: + other: 另一个文件对象或路径字符串 + + Returns: + 是否相等 + """ + if other is None: + return False + + # 获取比较对象的路径 + other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other) + self_path = self.OriginFullPath + + # 标准化路径,移除末尾的斜线 + if self_path.endswith('/') or self_path.endswith('\\'): + self_path = self_path[:-1] + if other_path.endswith('/') or other_path.endswith('\\'): + other_path = other_path[:-1] + + # 使用系统的路径规范化函数进行比较 + return os.path.normpath(self_path) == os.path.normpath(other_path) + + + def to_path(self): + return Path(self.OriginFullPath) + def __Path__(self): + return Path(self.OriginFullPath) + + def write(self, data:Union[str, bytes]): + self.__file.write(data) + + def create(self): + if self.exists() == False: + if self.is_dir(): + if os.path.exists(self.get_dir()): + os.makedirs(self.OriginFullPath) + else: + raise FileNotFoundError(f"{self.OriginFullPath} cannt create, because its parent path is not exist") + else: + self.open('w') + self.close() + return self + def exists(self): + return os.path.exists(self.OriginFullPath) + def remove(self): + self.close() + if self.exists(): + if self.is_dir(): + shutil.rmtree(self.OriginFullPath) + else: + os.remove(self.OriginFullPath) + return self + def copy(self, to_path:Optional[Union[Self, str]]=None): + if to_path is None: + return ToolFile(self.OriginFullPath) + if self.exists() is False: + raise FileNotFoundError("file not found") + self.close() + target_file = ToolFile(UnWrapper(to_path)) + if target_file.is_dir(): + target_file = target_file|self.GetFilename() + shutil.copy(self.OriginFullPath, UnWrapper(target_file)) + return target_file + def move(self, to_path:Union[Self, str]): + if self.exists() is False: + raise FileNotFoundError("file not found") + self.close() + target_file = ToolFile(UnWrapper(to_path)) + if target_file.is_dir(): + target_file = target_file|self.GetFilename() + shutil.move(self.OriginFullPath, UnWrapper(target_file)) + self.OriginFullPath = target_file.OriginFullPath + return self + def rename(self, newpath:Union[Self, str]): + if self.exists() is False: + raise FileNotFoundError("file not found") + self.close() + newpath:str = UnWrapper(newpath) + if '\\' in newpath or '/' in newpath: + newpath = get_base_filename(newpath) + new_current_path = os.path.join(self.get_dir(), newpath) + os.rename(self.OriginFullPath, new_current_path) + self.OriginFullPath = new_current_path + return self + + def refresh(self): + self.load() + return self + def open(self, mode='r', is_refresh=False, encoding:str='utf-8', *args, **kwargs): + self.close() + if 'b' in mode: + self.__file = open(self.OriginFullPath, mode, *args, **kwargs) + else: + self.__file = open(self.OriginFullPath, mode, encoding=encoding, *args, **kwargs) + if is_refresh: + self.refresh() + return self.__file + def close(self): + if self.__file: + self.__file.close() + if self.__datas_lit_key in self.datas: + self.datas[self.__datas_lit_key] = None + return self.__file + def is_open(self)->bool: + return self.__file is not None + + def load(self): + if self.OriginFullPath is None: + if os.path.exists(temp_tool_file_path_name): + self.data = pickle.load(open(temp_tool_file_path_name, 'rb')) + return self.data + else: + raise FileNotFoundError(f"{self.OriginFullPath} not found, but this ToolFile's target is None") + elif self.is_dir(): + self.__file = open(os.path.join(self.GetFullPath(), temp_tool_file_path_name), 'rb') + self.data = pickle.load(self.__file) + return self.data + suffix = self.get_extension() + if suffix == 'json': + self.load_as_json() + elif suffix == 'csv': + self.load_as_csv() + elif suffix == 'xml': + self.load_as_xml() + elif suffix == 'xlsx' or suffix == 'xls': + self.load_as_excel() + elif suffix in text_readable_file_type: + self.load_as_text() + elif suffix == 'docx' or suffix == 'doc': + self.load_as_docx() + elif suffix in audio_file_type: + self.load_as_audio() + elif is_image_file(self.OriginFullPath): + self.load_as_image() + elif is_binary_file(self.OriginFullPath): + self.load_as_binary() + else: + self.load_as_unknown(suffix) + return self.data + def load_as_json(self) -> pd.DataFrame: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + json_data = json.load(self.__file) + #try: + # from pydantic import BaseModel + # if "__type" in json_data and "pydantic.BaseModel" in json_data["__type"]: + # del json_data["__type"] + # json_data = BaseModel.model_validate(json_data) + #except: + # pass + self.data = json_data + return self.data + def load_as_csv(self) -> pd.DataFrame: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + self.data = pd.read_csv(self.__file) + return self.data + def load_as_xml(self) -> pd.DataFrame: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + self.data = pd.read_xml(self.__file) + return self.data + def load_as_dataframe(self) -> pd.DataFrame: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + self.data = pd.read_csv(self.__file) + return self.data + def load_as_excel(self) -> pd.DataFrame: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + self.data = pd.read_excel(self.__file) + return self.data + def load_as_binary(self) -> bytes: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('rb') + self.data = self.__file.read() + return self.data + def load_as_text(self) -> str: + if self.is_open() is False or 'w' in self.__file.mode: + self.open('r') + self.data = list_byte_to_string(self.__file.readlines()) + return self.data + def load_as_wav(self): + self.data = AudioSegment.from_wav(self.OriginFullPath) + return self.data + def load_as_audio(self): + self.data = AudioSegment.from_file(self.OriginFullPath) + return self.data + def load_as_image(self) -> ImageFile.ImageFile: + self.data = Image.open(self.OriginFullPath) + return self.data + def load_as_docx(self) -> DocumentObject: + self.data = Document(self.OriginFullPath) + return self.data + def load_as_unknown(self, suffix:str) -> Any: + return self.load_as_text() + def load_as_model(self, model:type[BaseModel]) -> BaseModel: + return model.model_validate(self.load_as_json()) + + def save(self, path:Optional[str]=None): + if path is None and self.OriginFullPath is None: + raise Exception('No file path specified') + elif path is None and self.is_dir(): + with open(os.path.join(self.OriginFullPath, temp_tool_file_path_name),'wb') as temp_file: + pickle.dump(self.data, temp_file) + return self + suffix = self.get_extension(path) + if suffix == 'json': + self.save_as_json(path) + elif suffix == 'csv': + self.save_as_csv(path) + elif suffix == 'xml': + self.save_as_xml(path) + elif suffix == 'xlsx' or suffix == 'xls': + self.save_as_excel(path) + elif suffix in text_readable_file_type: + self.save_as_text(path) + elif suffix == 'docx': + self.save_as_docx(path) + elif suffix in audio_file_type: + self.save_as_audio(path, suffix) + elif is_binary_file(self.OriginFullPath): + self.save_as_binary(path) + elif is_image_file(self.OriginFullPath): + self.save_as_image(path) + else: + self.save_as_unknown(path) + return self + def save_as_json(self, path:Optional[str]=None): + json_data = self.data + try: + from pydantic import BaseModel + if isinstance(json_data, BaseModel): + json_data = json_data.model_dump() + json_data["__type"] = f"{self.data.__class__.__name__}, pydantic.BaseModel" + except: + pass + path = path if path is not None else self.OriginFullPath + self.close() + with open(path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, indent=4) + return self + def save_as_csv(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.to_csv(path) + return self + def save_as_xml(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.to_xml(path) + return self + def save_as_dataframe(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.to_csv(path) + return self + def save_as_excel(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.to_excel(path, index=False) + return self + def save_as_binary(self, path:Optional[str]=None): + if path is not None: + with open(path, 'wb') as f: + f.write(self.data) + f.flush() + else: + if self.is_open() is False or 'r' in self.__file.mode: + self.open('wb') + self.__file.write(self.data) + self.__file.flush() + return self + def save_as_text(self, path:Optional[str]=None): + if path is not None: + with open(path, 'w') as f: + f.writelines(self.data) + f.flush() + else: + if self.is_open() is False or 'r' in self.__file.mode: + self.open('w') + self.__file.writelines(self.data) + self.__file.flush() + return self + def save_as_audio(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.export(path, format=self.get_extension(path)) + return self + def save_as_image(self, path:Optional[str]=None): + path = path if path is not None else self.OriginFullPath + self.data.save(path) + return self + def save_as_docx(self, path:Optional[str]=None): + if self.data is str: + self.data = Document() + table = self.data.add_table(rows=1, cols=1) + table.cell(0, 0).text = self.data + path = path if path is not None else self.OriginFullPath + self.data.save(path) + return self + def save_as_unknown(self, path:Optional[str]=None): + self.save_as_text(path) + def save_as_model(self, model:type[BaseModel], path:Optional[str]=None): + self.save_as_json(path) + + def get_size(self) -> int: + ''' + return: + return size of directory + ''' + return os.path.getsize(self.OriginFullPath) + def get_data_type(self) -> type: + return type(self.data) + def has_data_type_is(self, types:Union[type, Sequence[type]]) -> bool: + if isinstance(types, Sequence) is False: + return self.get_data_type() == types + return self.get_data_type() in types + def get_extension(self, path:str=None): + if self.is_dir() and path is None: + raise Exception("Cannot get extension of a directory") + path = path if path is not None else self.OriginFullPath + if path is None: + raise Exception("Cannot get extension without target path") + return get_extension_name(path) + def GetFullPath(self) -> str: + return self.OriginFullPath + def GetFilename(self, is_without_extension = False): + ''' + if target path is a file, it return filename + if target path is a directory, it return top directory name + ''' + if is_without_extension and '.' in self.OriginFullPath: + return get_base_filename(self.OriginFullPath)[:-(len(self.get_extension())+1)] + elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': + return get_base_filename(self.OriginFullPath[:-1]) + else: + return get_base_filename(self.OriginFullPath) + def get_dir(self): + return os.path.dirname(self.OriginFullPath) + def get_dir_tool_file(self): + return ToolFile(self.get_dir()) + def get_current_dir_name(self): + return os.path.dirname(self.OriginFullPath) + + def is_dir(self): + if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': + return True + else: + return os.path.isdir(self.OriginFullPath) + def is_file(self): + return os.path.isfile(self.OriginFullPath) + def is_binary_file(self): + return is_binary_file(self.__file) + def is_image(self): + return is_image_file(self.OriginFullPath) + + def try_create_parent_path(self): + dir_path = os.path.dirname(self.OriginFullPath) + if dir_path == '': + return self + if not os.path.exists(dir_path): + os.makedirs(dir_path) + return self + def dir_iter(self): + return os.listdir(self.OriginFullPath) + def dir_tool_file_iter(self): + result = [self] + result.clear() + for file in os.listdir(self.OriginFullPath): + result.append(self|file) + return result + def back_to_parent_dir(self): + self.close() + self.OriginFullPath = self.get_dir() + return self + def get_parent_dir(self): + return ToolFile(self.get_dir()) + def dir_count(self, ignore_folder:bool = True): + iter = self.dir_iter() + result = 0 + for content in iter: + if ignore_folder and os.path.isdir(os.path.join(self.OriginFullPath, content)): + continue + result += 1 + return result + def dir_clear(self): + for file in self.dir_tool_file_iter(): + file.remove() + return self + def first_file_with_extension(self, extension:str): + target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + for file in target_dir.dir_tool_file_iter(): + if file.is_dir() is False and file.get_extension() == extension: + return file + return None + def first_file(self, pr:Callable[[str], bool]): + target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + for file in target_dir.dir_tool_file_iter(): + if pr(file.GetFilename()): + return file + return None + def find_file_with_extension(self, extension:str): + target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + result:List[ToolFile] = [] + for file in target_dir.dir_tool_file_iter(): + if file.is_dir() is False and file.get_extension() == extension: + result.append(file) + return result + def find_file(self, pr:Callable[[str], bool]): + target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + result:List[ToolFile] = [] + for file in target_dir.dir_tool_file_iter(): + if pr(file.GetFilename()): + result.append(file) + return result + def dir_walk( + self, + top, + topdown: bool = True, + onerror: Optional[Callable] = None, + followlinks: bool = False + ) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]: + return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks) + + def append_text(self, line:str): + if self.has_data_type_is(type(str)): + self.data += line + elif self.has_data_type_is(type(DocumentObject)): + self.data.add_paragraph(line) + else: + raise TypeError(f"Unsupported data type for {sys._getframe().f_code.co_name}") + return self + + def bool(self): + return self.exists() + def __bool__(self): + return self.exists() + + def must_exists_path(self): + self.close() + self.try_create_parent_path() + self.create() + return self + + def make_file_inside(self, data:Self, is_delete_source = False): + if self.is_dir() is False: + raise Exception("Cannot make file inside a file, because this object target is not a directory") + result = self|data.GetFilename() + if is_delete_source: + data.move(result) + else: + data.copy(result) + return self + + @property + def extension(self): + if self.is_dir(): + return None + return self.get_extension() + @property + def filename(self): + if self.is_dir(): + return None + return self.GetFilename(True) + @property + def dirname(self): + if self.is_dir(): + return self.get_current_dir_name() + return None + @property + def dirpath(self): + if self.is_dir(): + return self.get_current_dir_name() + return None + @property + def shortname(self): + return self.GetFilename(False) + @property + def fullpath(self): + return self.GetFullPath() + + def make_lib_path(self) -> Path: + return Path(self.OriginFullPath) + + def in_extensions(self, *args:str) -> bool: + return self.get_extension() in args + + @override + def SymbolName(self): + return f"ToolFile<{self.GetFullPath()}>" + @override + def ToString(self): + return self.GetFullPath() + + def compress(self, output_path: Optional[str] = None, format: str = 'zip') -> Self: + """ + 压缩文件或目录 + Args: + output_path: 输出路径,如果为None则使用原文件名 + format: 压缩格式,支持'zip'和'tar' + Returns: + 压缩后的文件对象 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + if output_path is None: + output_path = self.GetFullPath() + ('.zip' if format == 'zip' else '.tar') + + try: + if format == 'zip': + with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + if self.is_dir(): + for root, _, files in os.walk(self.GetFullPath()): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, self.GetFullPath()) + zipf.write(file_path, arcname) + else: + zipf.write(self.GetFullPath(), self.GetFilename()) + elif format == 'tar': + with tarfile.open(output_path, 'w') as tarf: + if self.is_dir(): + tarf.add(self.GetFullPath(), arcname=self.GetFilename()) + else: + tarf.add(self.GetFullPath(), arcname=self.GetFilename()) + else: + raise CompressionError(f"Unsupported compression format: {format}") + + return ToolFile(output_path) + except Exception as e: + raise CompressionError(f"Compression failed: {str(e)}") + + def decompress(self, output_path: Optional[str] = None) -> Self: + """ + 解压文件 + Args: + output_path: 输出目录,如果为None则使用原文件名 + Returns: + 解压后的目录对象 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + if output_path is None: + output_path = self.GetFullPath() + '_extracted' + + try: + if self.get_extension() == 'zip': + with zipfile.ZipFile(self.GetFullPath(), 'r') as zipf: + zipf.extractall(output_path) + elif self.get_extension() == 'tar': + with tarfile.open(self.GetFullPath(), 'r') as tarf: + tarf.extractall(output_path) + else: + raise CompressionError(f"Unsupported archive format: {self.get_extension()}") + + return ToolFile(output_path) + except Exception as e: + raise CompressionError(f"Decompression failed: {str(e)}") + + def encrypt(self, key: str, algorithm: str = 'AES') -> Self: + """ + 加密文件 + Args: + key: 加密密钥 + algorithm: 加密算法,目前支持'AES' + Returns: + 加密后的文件对象 + """ + from cryptography.fernet import Fernet + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 生成加密密钥 + salt = os.urandom(16) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + ) + key = base64.urlsafe_b64encode(kdf.derive(key.encode())) + + # 创建加密器 + f = Fernet(key) + + # 读取文件内容 + with open(self.GetFullPath(), 'rb') as file: + file_data = file.read() + + # 加密数据 + encrypted_data = f.encrypt(file_data) + + # 保存加密后的文件 + encrypted_path = self.GetFullPath() + '.encrypted' + with open(encrypted_path, 'wb') as file: + file.write(salt + encrypted_data) + + return ToolFile(encrypted_path) + except Exception as e: + raise EncryptionError(f"Encryption failed: {str(e)}") + + def decrypt(self, key: str, algorithm: str = 'AES') -> Self: + """ + 解密文件 + Args: + key: 解密密钥 + algorithm: 解密算法,目前支持'AES' + Returns: + 解密后的文件对象 + """ + from cryptography.fernet import Fernet + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 读取加密文件 + with open(self.GetFullPath(), 'rb') as file: + file_data = file.read() + + # 提取salt和加密数据 + salt = file_data[:16] + encrypted_data = file_data[16:] + + # 生成解密密钥 + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + ) + key = base64.urlsafe_b64encode(kdf.derive(key.encode())) + + # 创建解密器 + f = Fernet(key) + + # 解密数据 + decrypted_data = f.decrypt(encrypted_data) + + # 保存解密后的文件 + decrypted_path = self.GetFullPath() + '.decrypted' + with open(decrypted_path, 'wb') as file: + file.write(decrypted_data) + + return ToolFile(decrypted_path) + except Exception as e: + raise EncryptionError(f"Decryption failed: {str(e)}") + + def calculate_hash(self, algorithm: str = 'md5', chunk_size: int = 8192) -> str: + """ + 计算文件的哈希值 + Args: + algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'等 + chunk_size: 每次读取的字节数 + Returns: + 文件的哈希值(十六进制字符串) + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 获取哈希算法 + hash_algo = getattr(hashlib, algorithm.lower()) + if not hash_algo: + raise HashError(f"Unsupported hash algorithm: {algorithm}") + + # 创建哈希对象 + hasher = hash_algo() + + # 分块读取文件并更新哈希值 + with open(self.GetFullPath(), 'rb') as f: + while chunk := f.read(chunk_size): + hasher.update(chunk) + + return hasher.hexdigest() + except Exception as e: + raise HashError(f"Hash calculation failed: {str(e)}") + + def verify_hash(self, expected_hash: str, algorithm: str = 'md5') -> bool: + """ + 验证文件哈希值 + Args: + expected_hash: 期望的哈希值 + algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'等 + Returns: + 是否匹配 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + actual_hash = self.calculate_hash(algorithm) + return actual_hash.lower() == expected_hash.lower() + except Exception as e: + raise HashError(f"Hash verification failed: {str(e)}") + + def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self: + """ + 保存文件的哈希值到文件 + Args: + algorithm: 哈希算法 + output_path: 输出文件路径,如果为None则使用原文件名 + Returns: + 哈希值文件对象 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 计算哈希值 + hash_value = self.calculate_hash(algorithm) + + # 生成输出路径 + if output_path is None: + output_path = self.GetFullPath() + f'.{algorithm}' + + # 保存哈希值 + with open(output_path, 'w') as f: + f.write(f"{hash_value} *{self.GetFilename()}") + + return ToolFile(output_path) + except Exception as e: + raise HashError(f"Hash saving failed: {str(e)}") + + def start_monitoring( + self, + callback: Callable[[str, str], None], + recursive: bool = False, + ignore_patterns: Optional[List[str]] = None, + ignore_directories: bool = False, + case_sensitive: bool = True, + is_log: bool = True + ) -> None: + """ + 开始监控文件或目录的变化 + Args: + callback: 回调函数,接收事件类型和路径两个参数 + recursive: 是否递归监控子目录 + ignore_patterns: 忽略的文件模式列表 + ignore_directories: 是否忽略目录事件 + case_sensitive: 是否区分大小写 + """ + from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + class EventHandler(FileSystemEventHandler): + def __init__(self, callback, ignore_patterns, ignore_directories, case_sensitive): + self.callback = callback + self.ignore_patterns = ignore_patterns or [] + self.ignore_directories = ignore_directories + self.case_sensitive = case_sensitive + + def should_ignore(self, path: str) -> bool: + if self.ignore_directories and os.path.isdir(path): + return True + if not self.case_sensitive: + path = path.lower() + return any(pattern in path for pattern in self.ignore_patterns) + + def on_created(self, event): + if not self.should_ignore(event.src_path): + self.callback('created', event.src_path) + + def on_modified(self, event): + if not self.should_ignore(event.src_path): + self.callback('modified', event.src_path) + + def on_deleted(self, event): + if not self.should_ignore(event.src_path): + self.callback('deleted', event.src_path) + + def on_moved(self, event): + if not self.should_ignore(event.src_path): + self.callback('moved', f"{event.src_path} -> {event.dest_path}") + + # 创建事件处理器 + event_handler = EventHandler( + callback=callback, + ignore_patterns=ignore_patterns, + ignore_directories=ignore_directories, + case_sensitive=case_sensitive + ) + + # 创建观察者 + observer = Observer() + observer.schedule(event_handler, self.GetFullPath(), recursive=recursive) + + # 启动监控 + observer.start() + if is_log: + print(f"Started monitoring {self.GetFullPath()}") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + if is_log: + print("Stopped monitoring") + + observer.join() + + except Exception as e: + raise FileMonitorError(f"Failed to start monitoring: {str(e)}") + + def create_backup( + self, + backup_dir: Optional[str] = None, + max_backups: int = 5, + backup_format: str = 'zip', + include_metadata: bool = True + ) -> Self: + """ + 创建文件或目录的备份 + Args: + backup_dir: 备份目录,如果为None则使用原目录下的.backup目录 + max_backups: 最大保留备份数量 + backup_format: 备份格式,支持'zip'和'tar' + include_metadata: 是否包含元数据 + Returns: + 备份文件对象 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 生成备份目录 + if backup_dir is None: + backup_dir = os.path.join(self.get_dir(), '.backup') + backup_dir:Self = ToolFile(backup_dir) + backup_dir.must_exists_path() + + # 生成备份文件名 + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + backup_name = f"{self.GetFilename()}_{timestamp}" + + # 创建备份 + if backup_format == 'zip': + backup_path = backup_dir | f"{backup_name}.zip" + with zipfile.ZipFile(backup_path.GetFullPath(), 'w', zipfile.ZIP_DEFLATED) as zipf: + if self.is_dir(): + for root, _, files in os.walk(self.GetFullPath()): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, self.GetFullPath()) + zipf.write(file_path, arcname) + else: + zipf.write(self.GetFullPath(), self.GetFilename()) + elif backup_format == 'tar': + backup_path = backup_dir | f"{backup_name}.tar" + with tarfile.open(backup_path.GetFullPath(), 'w') as tarf: + if self.is_dir(): + tarf.add(self.GetFullPath(), arcname=self.GetFilename()) + else: + tarf.add(self.GetFullPath(), arcname=self.GetFilename()) + else: + raise BackupError(f"Unsupported backup format: {backup_format}") + + # 添加元数据 + if include_metadata: + metadata = { + 'original_path': self.GetFullPath(), + 'backup_time': timestamp, + 'file_size': self.get_size(), + 'is_directory': self.is_dir(), + 'hash': self.calculate_hash() + } + metadata_path = backup_dir | f"{backup_name}.meta.json" + with open(metadata_path.GetFullPath(), 'w') as f: + json.dump(metadata, f, indent=4) + + # 清理旧备份 + if max_backups > 0: + backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) + backups.sort(key=lambda f: f.GetFilename(), reverse=True) + for old_backup in backups[max_backups:]: + old_backup.remove() + + return backup_path + + except Exception as e: + raise BackupError(f"Backup failed: {str(e)}") + + def restore_backup( + self, + backup_file: Union[str, Self], + restore_path: Optional[str] = None, + verify_hash: bool = True + ) -> Self: + """ + 从备份恢复文件或目录 + Args: + backup_file: 备份文件路径 + restore_path: 恢复路径,如果为None则恢复到原位置 + verify_hash: 是否验证哈希值 + Returns: + 恢复后的文件对象 + """ + if not isinstance(backup_file, ToolFile): + backup_file:Self = ToolFile(backup_file) + + if not backup_file.exists(): + raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}") + + try: + # 确定恢复路径 + if restore_path is None: + restore_path = self.GetFullPath() + restore_path:Self = ToolFile(restore_path) + + # 解压备份 + if backup_file.get_extension() == 'zip': + with zipfile.ZipFile(backup_file.GetFullPath(), 'r') as zipf: + zipf.extractall(restore_path.GetFullPath()) + elif backup_file.get_extension() == 'tar': + with tarfile.open(backup_file.GetFullPath(), 'r') as tarf: + tarf.extractall(restore_path.GetFullPath()) + else: + raise BackupError(f"Unsupported backup format: {backup_file.get_extension()}") + + # 验证哈希值 + if verify_hash: + metadata_path = backup_file.GetFullPath()[:-len(backup_file.get_extension())-1] + '.meta.json' + if os.path.exists(metadata_path): + with open(metadata_path, 'r') as f: + metadata = json.load(f) + restored_file = ToolFile(restore_path.GetFullPath()) + if restored_file.calculate_hash() != metadata['hash']: + raise BackupError("Hash verification failed") + + return restore_path + + except Exception as e: + raise BackupError(f"Restore failed: {str(e)}") + + def list_backups(self) -> List[Self]: + """ + 列出所有备份 + Returns: + 备份文件列表 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + backup_dir:Self = ToolFile(os.path.join(self.get_dir(), '.backup')) + if not backup_dir.exists(): + return [] + + backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) + backups.sort(key=lambda f: ToolFile(f).GetFilename(), reverse=True) + return backups + + except Exception as e: + raise BackupError(f"Failed to list backups: {str(e)}") + + def get_permissions(self) -> Dict[str, bool]: + """ + 获取文件或目录的权限 + Returns: + 权限字典,包含以下键: + - read: 是否可读 + - write: 是否可写 + - execute: 是否可执行 + - hidden: 是否隐藏 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + mode = os.stat(self.GetFullPath()).st_mode + return { + 'read': bool(mode & stat.S_IRUSR), + 'write': bool(mode & stat.S_IWUSR), + 'execute': bool(mode & stat.S_IXUSR), + 'hidden': bool(os.path.isfile(self.GetFullPath()) and self.GetFilename().startswith('.')) + } + except Exception as e: + raise PermissionError(f"Failed to get permissions: {str(e)}") + + def set_permissions( + self, + read: Optional[bool] = None, + write: Optional[bool] = None, + execute: Optional[bool] = None, + hidden: Optional[bool] = None, + recursive: bool = False + ) -> Self: + """ + 设置文件或目录的权限 + Args: + read: 是否可读 + write: 是否可写 + execute: 是否可执行 + hidden: 是否隐藏 + recursive: 是否递归设置目录权限 + Returns: + 文件对象本身 + """ + if not self.exists(): + raise FileNotFoundError(f"File not found: {self.GetFullPath()}") + + try: + # 获取当前权限 + current_perms = os.stat(self.GetFullPath()).st_mode + + # 设置新权限 + if read is not None: + if read: + current_perms |= stat.S_IRUSR + else: + current_perms &= ~stat.S_IRUSR + + if write is not None: + if write: + current_perms |= stat.S_IWUSR + else: + current_perms &= ~stat.S_IWUSR + + if execute is not None: + if execute: + current_perms |= stat.S_IXUSR + else: + current_perms &= ~stat.S_IXUSR + + # 应用权限 + os.chmod(self.GetFullPath(), current_perms) + + # 设置隐藏属性 + if hidden is not None: + if os.name == 'nt': # Windows + import ctypes + if hidden: + ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 2) + else: + ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 0) + else: # Unix/Linux/Mac + if hidden: + if not self.GetFilename().startswith('.'): + self.rename('.' + self.GetFilename()) + else: + if self.GetFilename().startswith('.'): + self.rename(self.GetFilename()[1:]) + + # 递归设置目录权限 + if recursive and self.is_dir(): + for root, _, files in os.walk(self.GetFullPath()): + for file in files: + file_path = os.path.join(root, file) + if read is not None: + if read: + os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IRUSR) + else: + os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IRUSR) + if write is not None: + if write: + os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IWUSR) + else: + os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IWUSR) + if execute is not None: + if execute: + os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR) + else: + os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IXUSR) + + return self + + except Exception as e: + raise PermissionError(f"Failed to set permissions: {str(e)}") + + def is_readable(self) -> bool: + """ + 检查文件是否可读 + Returns: + 是否可读 + """ + return self.get_permissions()['read'] + + def is_writable(self) -> bool: + """ + 检查文件是否可写 + Returns: + 是否可写 + """ + return self.get_permissions()['write'] + + def is_executable(self) -> bool: + """ + 检查文件是否可执行 + Returns: + 是否可执行 + """ + return self.get_permissions()['execute'] + + def is_hidden(self) -> bool: + """ + 检查文件是否隐藏 + Returns: + 是否隐藏 + """ + return self.get_permissions()['hidden'] + +def WrapperFile(file) -> ToolFile: + if isinstance(file, ToolFile): + return file + else: + return ToolFile(UnWrapper(file)) + +def split_elements( + file: Union[ToolFile, str], + *, + ratios: List[float] = [1,1], + pr: Optional[Callable[[ToolFile], bool]] = None, + shuffler: Optional[Callable[[List[ToolFile]], None]] = None, + output_dirs: Optional[List[ToolFile]] = None, + output_must_exist: bool = True, + output_callback: Optional[Callable[[ToolFile], None]] = None + ) -> List[List[ToolFile]]: + if is_loss_tool_file(file): + return [] + result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(), + ratios=ratios, + pr=pr, + shuffler=shuffler) + if output_dirs is None: + return result + for i in range(min(len(output_dirs), len(result))): + output_dir: ToolFile = output_dirs[i] + if output_dir.is_dir() is False: + raise Exception("Outputs must be directory") + if output_must_exist: + output_dir.must_exists_as_new() + for file in result[i]: + current = output_dirs[i].make_file_inside(file) + if output_callback: + output_callback(current) + + return result diff --git a/Convention/Runtime/Reflection.py b/Convention/Runtime/Reflection.py new file mode 100644 index 0000000..b7b3f2d --- /dev/null +++ b/Convention/Runtime/Reflection.py @@ -0,0 +1,1638 @@ +import importlib +import inspect +import types +import weakref +from enum import Enum, IntFlag +from typing import * +import typing +from .Config import * +from pydantic import BaseModel, Field, PrivateAttr +import json +import functools +import concurrent.futures +from typing import Set + +type_symbols = { + 'int' : int, + 'float' : float, + 'str' : str, + 'list' : list, + 'dict' : dict, + 'tuple' : tuple, + 'set' : set, + 'bool' : bool, + 'NoneType' : type(None), + } + +_Internal_Reflection_Debug:bool = False +def GetInternalReflectionDebug() -> bool: + return _Internal_Reflection_Debug and GetInternalDebug() +def SetInternalReflectionDebug(debug:bool) -> None: + global _Internal_Reflection_Debug + _Internal_Reflection_Debug = debug + +# 缓存get_type_from_string的结果 +_type_string_cache: Dict[str, type] = {} + +class ReflectionException(Exception): + def __init__(self, message:str): + self.message = f"{ConsoleFrontColor.RED}{message}{ConsoleFrontColor.RESET}" + super().__init__(self.message) + +def get_type_from_string(type_string:str) -> type: + """ + 根据字符串生成类型,使用缓存提高性能 + """ + # 检查缓存 + if type_string in _type_string_cache: + return _type_string_cache[type_string] + + result = None + + # 检查内置类型映射 + if type_string in type_symbols: + result = type_symbols[type_string] + # 从内置类型中获取 + elif type_string in dir(types): + result = getattr(types, type_string) + # 从当前全局命名空间获取 + elif type_string in globals(): + result = globals().get(type_string) + # 从当前模块获取 + elif type_string in dir(__import__(__name__)): + result = getattr(__import__(__name__), type_string) + # 尝试从其他模块获取 + else: + try: + if '.' not in type_string: + raise ValueError(f"Empty module name, type_string is {type_string}") + module_name, _, class_name = type_string.rpartition('.') + if not module_name: + raise ValueError(f"Empty module name, type_string is {type_string}") + + # 首先尝试直接获取模块 + try: + module = sys.modules[module_name] + except KeyError: + # 模块未加载,需要导入 + module = importlib.import_module(module_name) + + result = getattr(module, class_name) + except (ImportError, AttributeError, ValueError) as ex: + raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>") from ex + + # 更新缓存 + if result is not None: + _type_string_cache[type_string] = result + + return result + +@functools.lru_cache(maxsize=256) +def get_type_from_string_with_module(type_string:str, module_name:str) -> type|None: + ''' + 根据字符串生成类型,带模块名参数,使用缓存 + ''' + # 检查内置类型映射 + if type_string in type_symbols: + return type_symbols[type_string] + + # 尝试从指定模块获取 + try: + module = sys.modules.get(module_name) + if module and type_string in dir(module): + return getattr(module, type_string) + except (KeyError, AttributeError): + pass + + # 尝试从类型模块获取 + if type_string in dir(types): + return getattr(types, type_string) + + return None + +# 获取泛型参数 +def get_generic_args(type_hint: type | Any) -> tuple[type | None, tuple[type, ...] | None]: + origin = get_origin(type_hint) # 获取原始类型 + args = get_args(type_hint) # 获取泛型参数 + + if origin is None: + return None, None + return origin, args + +def is_generic(type_hint: type | Any) -> bool: + return "__origin__" in dir(type_hint) + +class _SpecialIndictaor: + pass + +class ListIndictaor(_SpecialIndictaor): + elementType:type + def __init__(self, elementType:type): + self.elementType = elementType + + def __repr__(self) -> str: + return f"ListIndictaor" + def __str__(self) -> str: + return self.__repr__() + @override + def ToString(self) -> str: + return self.__repr__() + + def __hash__(self) -> int: + return hash(List[self.elementType]) + +class DictIndictaor(_SpecialIndictaor): + keyType:type + valueType:type + def __init__(self, keyType:type, valueType:type): + self.keyType = keyType + self.valueType = valueType + + def __repr__(self) -> str: + return f"DictIndictaor" + def __str__(self) -> str: + return self.__repr__() + @override + def ToString(self) -> str: + return self.__repr__() + + def __hash__(self) -> int: + return hash(Dict[self.keyType, self.valueType]) + +class TupleIndictaor(_SpecialIndictaor): + elementTypes:Tuple[type, ...] + def __init__(self, *elementTypes:type): + self.elementTypes = elementTypes + + def __repr__(self) -> str: + return f"TupleIndictaor<{', '.join(map(str, self.elementTypes))}>" + def __str__(self) -> str: + return self.__repr__() + @override + def ToString(self) -> str: + return self.__repr__() + + def __hash__(self) -> int: + return hash(Tuple[self.elementTypes]) + +class SetIndictaor(_SpecialIndictaor): + elementType:type + def __init__(self, elementType:type): + self.elementType = elementType + + def __repr__(self) -> str: + return f"SetIndictaor" + def __str__(self) -> str: + return self.__repr__() + @override + def ToString(self) -> str: + return self.__repr__() + + def __hash__(self) -> int: + return hash(Set[self.elementType]) + + +# 添加记忆化装饰器 +def memoize(func): + cache = {} + @functools.wraps(func) + def wrapper(*args, **kwargs): + key = str(args) + str(kwargs) + if key not in cache: + cache[key] = func(*args, **kwargs) + return cache[key] + return wrapper + +# 优化to_type函数 +@memoize +def to_type( + typen: type|Any|str, + *, + module_name: str|None=None + ) -> type|List[type]|_SpecialIndictaor: + # 快速路径:如果已经是类型,直接返回 + if isinstance(typen, type): + return typen + elif isinstance(typen, _SpecialIndictaor): + return typen + elif isinstance(typen, str): + # 快速路径:检查字符串的格式 + if '.' in typen: + # 可能是带模块名的类型 + try: + module_parts = typen.split('.') + # 尝试从sys.modules中获取模块 + current_module = sys.modules + for part in module_parts[:-1]: + if part in current_module: + current_module = current_module[part] + else: + # 需要导入模块 + current_module = importlib.import_module('.'.join(module_parts[:-1])) + break + + # 获取类型 + if isinstance(current_module, dict): + # 从字典中获取 + result = current_module.get(module_parts[-1]) + if isinstance(result, type): + return result + else: + # 从模块对象中获取 + result = getattr(current_module, module_parts[-1], None) + if isinstance(result, type): + return result + except (ImportError, AttributeError): + pass + + # 回退到一般处理 + import sys + if not all(c.isalnum() or c == '.' for c in typen): + raise ValueError(f"Invalid type string: {typen}, only alphanumeric characters and dots are allowed") + type_components = typen.split(".") + type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None) + type_final = type_components[-1] + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\ + f"typen: {typen}, type_components: {type_components}") + if type_module is not None: + return sys.modules[type_module].__dict__[type_final] + else: + for module in sys.modules.values(): + if type_final in module.__dict__: + return module.__dict__[type_final] + return get_type_from_string(typen) + elif is_union(typen): + uTypes = get_union_types(typen) + uTypes = [uType for uType in uTypes if uType is not type(None)] + if len(uTypes) == 1: + return uTypes[0] + elif len(uTypes) == 0: + return type(None) + else: + return uTypes + elif hasattr(typen, '__origin__'): + oType = get_origin(typen) + if oType is list: + return ListIndictaor(get_args(typen)[0]) + elif oType is dict: + return DictIndictaor(get_args(typen)[0], get_args(typen)[1]) + elif oType is tuple: + return TupleIndictaor(*get_args(typen)) + elif oType is set: + return SetIndictaor(get_args(typen)[0]) + else: + return oType + else: + return type(typen) + +def try_to_type(typen:type|Any|str, *, module_name:str|None=None) -> type|List[type]|_SpecialIndictaor|None: + try: + return to_type(typen, module_name=module_name) + except Exception: + return None + +def is_union(type_hint: type | Any) -> bool: + return "__origin__" in dir(type_hint) and type_hint.__origin__ == Union + +def get_union_types(type_hint: type | Any) -> List[type]: + return [t for t in type_hint.__args__] + +class TypeVarIndictaor: + pass + +class AnyVarIndicator: + pass + +# 优化decay_type函数 +@memoize +def decay_type( + type_hint: type|Any, + *, + module_name: str|None=None + ) -> type|List[type]|_SpecialIndictaor: + # 快速路径:直接判断常见类型 + if isinstance(type_hint, (type, _SpecialIndictaor)): + return type_hint + + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}") + + result:type|List[type] = None + + # 处理字符串类型 + if isinstance(type_hint, str): + try: + result = to_type(type_hint, module_name=module_name) + except TypeError: + result = Any + # 处理forward reference + elif hasattr(type_hint, "__forward_arg__"): + result = to_type(type_hint.__forward_arg__, module_name=module_name) + # 处理type类型 + elif type_hint is type: + result = type_hint + # 处理union类型 + elif is_union(type_hint): + result = get_union_types(type_hint) + # 处理TypeVar + elif isinstance(type_hint, TypeVar): + result = TypeVarIndictaor + # 处理泛型类型 + elif is_generic(type_hint): + result = get_origin(type_hint) + else: + raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>") + + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}") + return result + +def is_just_defined_in_current_class(member_name:str, current_class:type) -> bool: + ''' + 检查成员是否只在当前类中定义,而不是在父类中定义 + ''' + # 获取当前类的所有成员 + current_members = dict(inspect.getmembers(current_class)) + if member_name not in current_members: + return False + # 获取父类的所有成员 + for baseType in current_class.__bases__: + parent_members = dict(inspect.getmembers(baseType)) + if member_name in parent_members: + return False + return True + +class light_reflection(any_class): + def __init__(self, obj:object, type_str:str=None, *args, **kwargs): + if obj is not None: + self.obj = obj + elif type_str is not None: + self.obj = self.create_instance(type_str, args, kwargs) + + @override + def SymbolName(self): + return "light_reflection" + @override + def ToString(self): + return f"ToolReflection<{type(self.obj).__name__}>" + + def get_attributes(self): + """获取对象的所有属性和它们的值""" + return {attr: getattr(self.obj, attr) for attr in dir(self.obj) if not callable(getattr(self.obj, attr)) and not attr.startswith("__")} + + def get_methods(self): + """获取对象的所有方法""" + return {method: getattr(self.obj, method) for method in dir(self.obj) if callable(getattr(self.obj, method)) and not method.startswith("__")} + + def contains_attribute(self, attr_name): + """检查对象是否具有某个属性""" + return hasattr(self.obj, attr_name) + + def contains_method(self, method_name): + """检查对象是否具有某个方法""" + return hasattr(self.obj, method_name) and callable(getattr(self.obj, method_name)) + + def call_method(self, method_name, *args, **kwargs): + """调用对象的方法""" + if self.contains_method(method_name): + return getattr(self.obj, method_name)(*args, **kwargs) + else: + raise AttributeError(f"{self.obj.__class__.__name__} object has no method '{method_name}'") + + def set_attribute(self, attr_name, value): + """设置对象的属性值""" + if self.contains_attribute(attr_name): + setattr(self.obj, attr_name, value) + else: + raise AttributeError(f"{self.obj.__class__.__name__} object has no attribute '{attr_name}'") + def set(self, field:str, value): + self.set_attribute(field, value) + + def get_attribute(self, attr_name): + """获取对象的属性值""" + if self.contains_attribute(attr_name): + return getattr(self.obj, attr_name) + else: + raise AttributeError(f"{self.obj.__class__.__name__} object has no attribute '{attr_name}'") + def get(self, field:str): + return self.get_attribute(field) + + def create_instance(self, type_string:str, *args, **kwargs): + """根据类型字符串生成类型的实例""" + type_ = get_type_from_string(type_string) + return type_(*args, **kwargs) + + def create_instance_ex(self, type_string:str, params: Union[Dict[str,object], object]={}): + """根据类型字符串生成类型的实例""" + + typen = get_type_from_string(type_string) + if type_string in type_symbols: + return typen(params) + if params is None or len(params) == 0: + return typen() + + # 获取构造函数参数信息 + constructor_params = inspect.signature(typen.__init__).parameters + if len(constructor_params) == 0: + return typen() + + # 准备构造函数参数 + init_args = {'args':None, 'kwargs':None} + for param_name, param in constructor_params.items(): + if param_name == 'self': + continue + if param_name in params: + init_args[param_name] = params[param_name] + elif param.default is not param.empty: + init_args[param_name] = param.default + elif param_name == 'args' or param_name == 'kwargs': + continue + else: + raise TypeError(f"Cannot instantiate type '{type_string}' without required parameter '{param_name}'") + + return typen(**init_args) + +class BaseInfo(BaseModel, any_class): + def __init__(self, **kwargs): + BaseModel.__init__(self, **kwargs) + any_class.__init__(self) + + @virtual + def __str__(self) -> str: + return self.ToString() + +class MemberInfo(BaseInfo): + _MemberName: str = PrivateAttr(default="") + _ParentType: Optional[type] = PrivateAttr(default=None) + _IsStatic: bool = PrivateAttr(default=False) + _IsPublic: bool = PrivateAttr(default=False) + + def __init__(self, name:str, ctype:Optional[type], is_static:bool, is_public:bool, **kwargs): + super().__init__(**kwargs) + self._MemberName = name + self._ParentType = ctype + self._IsStatic = is_static + self._IsPublic = is_public + + @property + def MemberName(self) -> str: + return self._MemberName + @property + def ParentType(self) -> type: + return self._ParentType + @property + def IsStatic(self) -> bool: + return self._IsStatic + @property + def IsPublic(self) -> bool: + return self._IsPublic + + @override + def __repr__(self) -> str: + return f"<{self.MemberName}>" + @override + def __str__(self) -> str: + return f"{self.MemberName}" + @override + def SymbolName(self) -> str: + return "MemberInfo" + @override + def ToString(self) -> str: + return f"MemberInfo" + +class ValueInfo(BaseInfo): + _RealType: Optional[Any] = PrivateAttr(default=None) + _IsPrimitive: bool = PrivateAttr(default=False) + _IsValueType: bool = PrivateAttr(default=False) + _IsCollection: bool = PrivateAttr(default=False) + _IsDictionary: bool = PrivateAttr(default=False) + _IsTuple: bool = PrivateAttr(default=False) + _IsSet: bool = PrivateAttr(default=False) + _IsList: bool = PrivateAttr(default=False) + _IsUnsupported: bool = PrivateAttr(default=False) + _GenericArgs: List[type] = PrivateAttr(default=[]) + + @property + def IsUnion(self) -> bool: + return is_union(self._RealType) + @property + def RealType(self): + return self._RealType + @property + def IsCollection(self) -> bool: + return self._IsCollection + @property + def IsPrimitive(self) -> bool: + return self._IsPrimitive + @property + def IsValueType(self) -> bool: + return self._IsValueType + @property + def IsDictionary(self) -> bool: + return self._IsDictionary + @property + def IsTuple(self) -> bool: + return self._IsTuple + @property + def IsSet(self) -> bool: + return self._IsSet + @property + def IsList(self) -> bool: + return self._IsList + @property + def IsUnsupported(self) -> bool: + return self._IsUnsupported + @property + def GenericArgs(self) -> List[type]: + return self._GenericArgs + @property + def IsGeneric(self) -> bool: + return len(self._GenericArgs) > 0 + @property + def Module(self) -> Optional[Dict[str, type]]: + return sys.modules[self.RealType.__module__] + @property + def ModuleName(self) -> str: + return self.RealType.__module__ + + def __init__(self, metaType:type|Any, generic_args:List[type]=[], **kwargs) -> None: + super().__init__(**kwargs) + self._RealType = metaType + if GetInternalReflectionDebug() and len(generic_args) > 0: + print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\ + f"metaType={metaType}, generic_args={generic_args}") + self._GenericArgs = generic_args + if not isinstance(metaType, type): + return + self._IsPrimitive = ( + issubclass(metaType, int) or + issubclass(metaType, float) or + issubclass(metaType, str) or + issubclass(metaType, bool) or + issubclass(metaType, complex) #or + # issubclass(metaType, tuple) or + # issubclass(metaType, set) or + # issubclass(metaType, list) or + # issubclass(metaType, dict) + ) + self._IsValueType = ( + issubclass(metaType, int) or + issubclass(metaType, float) or + issubclass(metaType, str) or + issubclass(metaType, bool) or + issubclass(metaType, complex) + ) + self._IsCollection = ( + issubclass(metaType, list) or + issubclass(metaType, dict) or + issubclass(metaType, tuple) or + issubclass(metaType, set) + ) + self._IsDictionary = ( + issubclass(metaType, dict) + ) + self._IsTuple = ( + issubclass(metaType, tuple) + ) + self._IsSet = ( + issubclass(metaType, set) + ) + self._IsList = ( + issubclass(metaType, list) + ) + + def Verify(self, valueType:type) -> bool: + if self.IsUnsupported: + raise ReflectionException(f"Unsupported type: {self.RealType}") + if valueType is type(None): + return True + if self.IsUnion: + return any(ValueInfo(uType).Verify(valueType) for uType in get_union_types(self.RealType)) + elif self.RealType is Any: + return True + elif self.RealType is type(None): + return valueType is None or valueType is type(None) + else: + try: + return issubclass(valueType, self.RealType) + except Exception as e: + raise ReflectionException(f"Verify type {valueType} with {self.RealType}: \n{e}") from e + + def DecayToList(self) -> List[Self]: + result:List[Self] = [] + if self.IsUnion: + for uType in get_union_types(self.RealType): + result.extend(ValueInfo(uType).DecayToList()) + else: + result.append(self) + result = list(dict.fromkeys(result).keys()) + return result + + @override + def __repr__(self) -> str: + generic_args = ", ".join(self._GenericArgs) + return f"ValueInfo<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>" + @override + def SymbolName(self) -> str: + return "ValueInfo" + @override + def ToString(self) -> str: + generic_args = ", ".join(self._GenericArgs) + return f"<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>" + + @staticmethod + def Create( + metaType: type|Any, + *, + module_name: Optional[str] = None, + SelfType: type|Any|None = None, + **kwargs + ) -> Self: + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\ + f"metaType={metaType}, SelfType={SelfType}") + if isinstance(metaType, type): + if metaType is list: + return ValueInfo(list, [Any]) + elif metaType is dict: + return ValueInfo(dict, [Any, Any]) + elif metaType is tuple: + return ValueInfo(tuple, []) + elif metaType is set: + return ValueInfo(set, [Any]) + else: + return ValueInfo(metaType, **kwargs) + elif isinstance(metaType, str): + type_ = try_to_type(metaType, module_name=module_name) + if type_ is None: + return ValueInfo(metaType, **kwargs) + else: + return ValueInfo(type_, **kwargs) + elif metaType is Self: + if SelfType is None: + raise ReflectionException("SelfType is required when metaType is ") + return ValueInfo.Create(SelfType, **kwargs) + elif isinstance(metaType, TypeVar): + gargs = get_generic_args(metaType) + if len(gargs) == 1: + return ValueInfo(gargs[0], **kwargs) + else: + return ValueInfo(Any, **kwargs) + elif hasattr(metaType, '__origin__'): + oType = get_origin(metaType) + if oType is list: + return ValueInfo(list, [get_args(metaType)[0]]) + elif oType is dict: + return ValueInfo(dict, [get_args(metaType)[0], get_args(metaType)[1]]) + elif oType is tuple: + return ValueInfo(tuple, to_list(get_args(metaType))) + elif oType is set: + return ValueInfo(set, [get_args(metaType)[0]]) + return ValueInfo(metaType, **kwargs) + +class FieldInfo(MemberInfo): + _MetaType: Optional[ValueInfo] = PrivateAttr(default=None) + + def __init__( + self, + metaType: Any, + name: str, + ctype: type, + is_static: bool, + is_public: bool, + module_name: Optional[str] = None, + selfType: type|Any|None = None + ): + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\ + f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ") + super().__init__( + name = name, + ctype = ctype, + is_static = is_static, + is_public = is_public, + ) + self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType) + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\ + f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}") + + @property + def IsUnion(self) -> bool: + return self._MetaType.IsUnion + @property + def FieldName(self) -> str: + ''' + 字段名称 + ''' + return self.MemberName + @property + def ValueType(self): + return self._MetaType + @property + def FieldType(self): + ''' + 字段类型 + ''' + return self._MetaType.RealType + + def Verify(self, valueType:type) -> bool: + return self._MetaType.Verify(valueType) + + @virtual + def GetValue(self, obj:Any) -> Any: + if self.IsStatic: + return getattr(self.ParentType, self.MemberName) + else: + if not isinstance(obj, self.ParentType): + raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\ + f"{ConsoleFrontColor.RED} , parent type mismatch, expected {self.ParentType}, got {type(obj)}"\ + f"{ConsoleFrontColor.RESET}") + return getattr(obj, self.MemberName) + @virtual + def SetValue(self, obj:Any, value:Any) -> None: + if self.IsStatic: + if self.Verify(type(value)): + setattr(self.ParentType, self.MemberName, value) + else: + raise TypeError(f"Value type mismatch, expected {self.MetaType.RealType}, got {type(value)}") + else: + if not isinstance(obj, self.ParentType): + raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}") + if self.Verify(type(value)): + setattr(obj, self.MemberName, value) + else: + raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\ + f"{ConsoleFrontColor.RED} , value type mismatch, expected \"{self.FieldType}\""\ + f", got {type(value)}{ConsoleFrontColor.RESET}") + + @override + def __repr__(self) -> str: + return f"<{self.MemberName} type={self.FieldType}>" + @override + def SymbolName(self) -> str: + return "FieldInfo" + @override + def ToString(self) -> str: + return f"FieldInfo" + +class ParameterInfo(BaseInfo): + _MetaType: Optional[ValueInfo] = PrivateAttr(default=None) + _ParameterName: str = PrivateAttr(default="") + _IsOptional: bool = PrivateAttr(default=False) + _DefaultValue: Any = PrivateAttr(default=None) + + def __init__( + self, + metaType: Any, + name: str, + is_optional: bool, + default_value: Any, + module_name: Optional[str] = None, + selfType: type|Any|None = None, + **kwargs + ): + super().__init__(**kwargs) + self._ParameterName = name + self._IsOptional = is_optional + self._DefaultValue = default_value + self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType) + + @property + def ValueType(self): + return self._MetaType + @property + def ParameterName(self) -> str: + return self._ParameterName + @property + def ParameterType(self): + return self._MetaType.RealType + @property + def IsOptional(self) -> bool: + return self._IsOptional + @property + def DefaultValue(self) -> Any: + return self._DefaultValue + + def Verify(self, valueType:type) -> bool: + return self._MetaType.Verify(valueType) + + @override + def __repr__(self) -> str: + return f"<{self.ParameterName}>" + @override + def SymbolName(self) -> str: + return "ParameterInfo" + @override + def ToString(self) -> str: + return f"ParameterInfo" + +class MethodInfo(MemberInfo): + _ReturnType: Optional[ValueInfo] = PrivateAttr(default=None) + _Parameters: List[ParameterInfo] = PrivateAttr(default=[]) + _PositionalParameters: List[ParameterInfo] = PrivateAttr(default=[]) + _KeywordParameters: List[ParameterInfo] = PrivateAttr(default=[]) + _IsClassMethod: bool = PrivateAttr(default=False) + + def __init__( + self, + return_type: Any, + parameters: List[ParameterInfo], + positional_parameters: List[ParameterInfo], + keyword_parameters: List[ParameterInfo], + name: str, + ctype: type, + is_static: bool, + is_public: bool, + is_class_method: bool, + ): + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\ + f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})") + MemberInfo.__init__(self, name, ctype, is_static, is_public) + self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType) + self._Parameters = parameters + self._PositionalParameters = positional_parameters + self._KeywordParameters = keyword_parameters + self._IsClassMethod = is_class_method + @property + def ReturnType(self) -> ValueInfo: + return self._ReturnType.RealType + @property + def Parameters(self) -> List[ParameterInfo]: + return self._Parameters + @property + def PositionalParameters(self) -> List[ParameterInfo]: + return self._PositionalParameters + @property + def KeywordParameters(self) -> List[ParameterInfo]: + return self._KeywordParameters + @property + def IsClassMethod(self) -> bool: + return self._IsClassMethod + + @overload + def Invoke(self, obj:object, *args, **kwargs) -> object: + ''' + 调用实例方法 + ''' + ... + @overload + def Invoke(self, obj:type, *args, **kwargs) -> object: + ''' + 调用类方法 + ''' + ... + @overload + def Invoke(self, noneObj:Literal[None]|None, *args, **kwargs) -> object: + ''' + 调用静态方法 + ''' + ... + def Invoke(self, obj:object|type, *args, **kwargs) -> object: + if not self.IsStatic and obj is None: + raise TypeError("Object is None") + if not self.IsStatic and not self.IsClassMethod and not isinstance(obj, self.ParentType): + raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}") + if self.IsClassMethod and not isinstance(obj, type): + raise TypeError(f"Class method expected type, got {type(obj)}: {obj}") + result = None + if self.IsStatic: + method = getattr(self.ParentType, self.MemberName) + if method is None: + raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'") + result = method(*args, **kwargs) + elif self.IsClassMethod: + method = getattr(obj, self.MemberName) + if method is None: + raise AttributeError(f"{obj} class has no method '{self.MemberName}'") + result = method(*args, **kwargs) + else: + method = getattr(obj, self.MemberName) + if method is None: + raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'") + result = method(*args, **kwargs) + return result + @override + def SymbolName(self) -> str: + return "MethodInfo" + @override + def ToString(self) -> str: + return f"MethodInfo" + + @classmethod + def Create( + cls, + name: str, + method: Callable, + ctype: Optional[type] = None, + module_name: Optional[str] = None + ) -> Self: + ''' + 创建MethodInfo对象 + name: 方法名 + method: 方法对象 + ctype: 方法所属的类 + module_name: 模块名 + ''' + # 获取方法签名 + sig = inspect.signature(method) + is_static = isinstance(method, staticmethod) + is_class_method = isinstance(method, classmethod) + is_public = (name.startswith("__") and name.endswith("__")) or not name.startswith('_') + # 构建参数列表 + parameters:List[ParameterInfo] = [] + positional_parameters:List[ParameterInfo] = [] + keyword_parameters:List[ParameterInfo] = [] + for param_name, param in sig.parameters.items(): + if param_name in ('self', 'cls'): + continue + ptype = param.annotation if param.annotation != inspect.Parameter.empty else Any + ptype = ptype if isinstance(ptype, type) else Any + param_info = ParameterInfo( + metaType = ptype, + name = param_name, + is_optional = param.default != inspect.Parameter.empty, + default_value = param.default if param.default != inspect.Parameter.empty else None, + module_name = module_name, + selfType=ctype + ) + parameters.append(param_info) + if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): + positional_parameters.append(param_info) + elif param.kind == inspect.Parameter.KEYWORD_ONLY: + keyword_parameters.append(param_info) + # 构建方法信息 + return MethodInfo( + return_type = sig.return_annotation if sig.return_annotation != inspect.Signature.empty else Any, + parameters = parameters, + positional_parameters = positional_parameters, + keyword_parameters = keyword_parameters, + name = name, + ctype = ctype, + is_static = is_static, + is_public = is_public, + is_class_method = is_class_method + ) + +class RefTypeFlag(IntFlag): + Static:int = 0b00000001 + Instance:int = 0b00000010 + Public:int = 0b00000100 + Private:int = 0b00001000 + Default:int = 0b00010000 + Method:int = 0b00100000 + Field:int = 0b01000000 + Special:int = 0b10000000 + All:int = 0b11111111 + +_RefTypeLock:Dict[Any|type|_SpecialIndictaor, threading.Lock] = {} + +class RefType(ValueInfo): + _FieldInfos: List[FieldInfo] = PrivateAttr() + _MethodInfos: List[MethodInfo] = PrivateAttr() + _MemberNames: List[str] = PrivateAttr() + _BaseTypes: List[Self] = PrivateAttr(default=None) + _initialized: bool = PrivateAttr(default=False) + _BaseMemberNamesSet: Set[str] = PrivateAttr(default_factory=set) + _member_cache: Dict[Tuple[str, RefTypeFlag], Optional[MemberInfo]] = PrivateAttr(default_factory=dict) + + def __init__(self, metaType:type|_SpecialIndictaor): + if isinstance(metaType, ListIndictaor): + super().__init__(list, generic_args=[metaType.elementType]) + metaType = list + elif isinstance(metaType, DictIndictaor): + super().__init__(dict, generic_args=[metaType.keyType, metaType.valueType]) + metaType = dict + elif isinstance(metaType, TupleIndictaor): + super().__init__(tuple, generic_args=metaType.elementTypes) + metaType = tuple + elif isinstance(metaType, SetIndictaor): + super().__init__(set, generic_args=[metaType.elementType]) + metaType = set + elif is_generic(metaType): + raise NotImplementedError("Generic type is not supported") + else: + super().__init__(metaType) + + self._FieldInfos = [] + self._MethodInfos = [] + self._MemberNames = [] + self._BaseMemberNamesSet = set() + self._member_cache = {} + + # 延迟初始化标志 + self._initialized = False + + def _ensure_initialized(self): + """确保完全初始化,实现延迟加载""" + + # 初始化锁, 防止同个RefType在多线程中被重复初始化 + if self.RealType not in _RefTypeLock: + _RefTypeLock[self.RealType] = threading.Lock() + lock_guard(_RefTypeLock[self.RealType]) + + if self._initialized: + return + + metaType = self.RealType + # 初始化基类列表 - 只初始化一次 + if self._BaseTypes is None: + self._BaseTypes = [] + for baseType in metaType.__bases__: + if baseType == metaType: # 避免循环引用 + continue + self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType)) + + # 如果BaseMemberNamesSet为空,则填充它 + if not self._BaseMemberNamesSet: + # 使用更高效的方式收集基类成员名称 + for baseType in self._BaseTypes: + # 确保基类已初始化 + baseType._ensure_initialized() + # 直接合并集合,而不是逐个添加 + self._BaseMemberNamesSet.update(baseType._MemberNames) + self._BaseMemberNamesSet.update(baseType._BaseMemberNamesSet) + + class_var = metaType.__dict__ + # 只在需要时获取类型注解 + annotations_dict = {} + try: + annotations_dict = get_type_hints(metaType) + except (TypeError, NameError): + # 类型提示获取失败时,使用空字典 + pass + + # 减少函数调用开销的辅助函数,改为内联处理 + methods_info = [] + method_names = [] + fields_info = self._FieldInfos.copy() # 保留extensionFields + field_names = [] + + # 一次性收集所有成员,避免多次遍历 + members = inspect.getmembers(metaType) + + # 处理方法 + for name, member in members: + if name not in self._BaseMemberNamesSet: + if inspect.ismethod(member) or inspect.isfunction(member): + methods_info.append(MethodInfo.Create(name, member, ctype=metaType, module_name=self.ModuleName)) + method_names.append(name) + # 处理字段 (非方法成员) + elif not name.startswith('__'): # 排除魔术方法相关的属性 + is_static = name in class_var + is_public = (name.startswith('__') and name.endswith('__')) or not name.startswith('_') + fieldType = annotations_dict.get(name, Any) + field_info = FieldInfo( + metaType = fieldType, + name = name, + ctype = metaType, + is_static = is_static, + is_public = is_public, + module_name = self.ModuleName, + selfType=metaType + ) + fields_info.append(field_info) + field_names.append(name) + + # 处理BaseModel字段 - 这些通常有特殊的处理 + if issubclass(metaType, BaseModel): + try: + fields = getattr(metaType, 'model_fields', getattr(metaType, '__pydantic_fields__', {})) + for field_name, model_field in fields.items(): + if field_name not in self._BaseMemberNamesSet and field_name not in field_names: + fieldType = model_field.annotation if model_field.annotation is not None else Any + is_public = not getattr(model_field, 'exclude', False) + field_info = FieldInfo( + metaType=fieldType, + name=field_name, + ctype=metaType, + is_public=is_public, + is_static=False, + module_name=self.ModuleName, + selfType=metaType + ) + fields_info.append(field_info) + field_names.append(field_name) + except (AttributeError, TypeError): + pass # 忽略BaseModel相关错误 + + # 处理注释中的字段 - 只处理尚未添加的字段 + for name, annotation in annotations_dict.items(): + if name not in self._BaseMemberNamesSet and name not in field_names and not name.startswith('__'): + field_info = FieldInfo( + metaType=decay_type(annotation), + name=name, + ctype=metaType, + is_static=False, + is_public=not name.startswith('_'), + module_name=self.ModuleName, + selfType=metaType + ) + fields_info.append(field_info) + field_names.append(name) + + # 更新成员列表 + self._MethodInfos = methods_info + self._FieldInfos = fields_info + self._MemberNames = method_names + field_names + + self._initialized = True + + def _where_member(self, member:MemberInfo, flag:RefTypeFlag) -> bool: + # 如果是默认标志,直接根据常见条件判断,避免位运算 + if flag == RefTypeFlag.Default: + if isinstance(member, MethodInfo): + return member.IsPublic and (member.IsInstance or member.IsStatic) + elif isinstance(member, FieldInfo): + return member.IsPublic and member.IsInstance + return False + + # 否则进行完整的标志检查 + stats = True + if member.IsStatic: + stats &= (flag & RefTypeFlag.Static != 0) + else: + stats &= (flag & RefTypeFlag.Instance != 0) + if member.IsPublic: + stats &= (flag & RefTypeFlag.Public != 0) + else: + stats &= (flag & RefTypeFlag.Private != 0) + if isinstance(member, MethodInfo): + stats &= (flag & RefTypeFlag.Method != 0) + elif isinstance(member, FieldInfo): + stats &= (flag & RefTypeFlag.Field != 0) + if member.MemberName.startswith('__') and member.MemberName.endswith('__'): + stats &= (flag & RefTypeFlag.Special != 0) + return stats + + # 修改获取成员方法,使用缓存 + def GetField(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[FieldInfo]: + cache_key = (name, flags) + if cache_key in self._member_cache: + return self._member_cache[cache_key] + + result = next((field for field in self.GetFields(flags) if field.MemberName == name), None) + self._member_cache[cache_key] = result + return result + + def GetMethod(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MethodInfo]: + cache_key = (name, flags) + if cache_key in self._member_cache: + return self._member_cache[cache_key] + + result = next((method for method in self.GetMethods(flags) if method.MemberName == name), None) + self._member_cache[cache_key] = result + return result + + def GetMember(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MemberInfo]: + cache_key = (name, flags) + if cache_key in self._member_cache: + return self._member_cache[cache_key] + + result = next((member for member in self.GetMembers(flags) if member.MemberName == name), None) + self._member_cache[cache_key] = result + return result + + def GetFieldValue[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> T: + field = self.GetField(name, flags) + if field is not None: + return field.GetValue(obj) + else: + raise ReflectionException(f"Field {name} not found") + def SetFieldValue[T](self, obj:object, name:str, value:T, flags:RefTypeFlag=RefTypeFlag.Default) -> None: + field = self.GetField(name, flags) + if field is not None: + field.SetValue(obj, value) + else: + raise ReflectionException(f"Field {name} not found") + def InvokeMethod[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default, *args, **kwargs) -> T: + method = self.GetMethod(name, flags) + if method is not None: + return method.Invoke(obj, *args, **kwargs) + else: + raise ReflectionException(f"Method {name} not found") + + def TryGetFieldValue[T](self, obj:object, lv:left_value_reference[T], name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> bool: + try: + lv.ref_value = self.GetFieldValue(obj, name, flags) + return True + except ReflectionException: + return False + def TrySetFieldValue[T](self, obj:object, name:str, value:T, flags:RefTypeFlag=RefTypeFlag.Default) -> bool: + try: + self.SetFieldValue(obj, name, value, flags) + return True + except ReflectionException: + return False + def TryInvokeMethod[T](self, obj:object, lv:left_value_reference[T], + name:str, flags:RefTypeFlag=RefTypeFlag.Default, *args, **kwargs) -> bool: + try: + lv.ref_value = self.InvokeMethod(obj, name, flags, *args, **kwargs) + return True + except ReflectionException: + return False + + def CreateInstance(self, *args, **kwargs) -> object: + return self.RealType(*args, **kwargs) + + @override + def __repr__(self) -> str: + return f"RefType<{self.RealType}{f'<{self.GenericArgs}>' if self.IsGeneric else ''}>" + @override + def SymbolName(self) -> str: + return "RefType" + @override + def ToString(self) -> str: + return f"RefType" + def print_str(self, verbose:bool=False, flags:RefTypeFlag=RefTypeFlag.Default) -> str: + fields: List[str] = [] + methods: List[str] = [] + for field in self.GetFields(flags): + fields.append(f"{ConsoleFrontColor.GREEN if field.IsPublic else ConsoleFrontColor.RED}"\ + f"{field.ToString() if verbose else field.MemberName}{ConsoleFrontColor.RESET}") + for method in self.GetMethods(flags): + methods.append(f"{ConsoleFrontColor.YELLOW if method.IsPublic else ConsoleFrontColor.RED}"\ + f"{method.ToString() if verbose else method.MemberName}{ConsoleFrontColor.RESET}") + return f"RefType" + + @sealed + def tree(self, indent:int=4) -> str: + type_set: set = set() + def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]: + if currentType.IsPrimitive: + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\ + f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") + return f"{currentType.RealType}" + elif currentType.RealType in type_set: + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\ + f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") + return { + "type": f"{currentType.RealType}", + "value": { field.FieldName: f"{field.FieldType}" for field in currentType.GetFields() } + } + else: + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\ + f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") + type_set.add(currentType.RealType) + value = {} + fields = currentType.GetFields() + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}") + for field in fields: + value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)) + return { + "type": f"{currentType.RealType}", + "value": value + } + + return json.dumps(dfs(self), indent=indent) + + @override + def __hash__(self) -> int: + """使RefType对象可哈希,基于RealType和GenericArgs的哈希值""" + return hash((self.RealType, tuple(self.GenericArgs))) + + @override + def __eq__(self, other) -> bool: + """比较两个RefType对象是否相等,基于RealType的比较""" + if not isinstance(other, RefType): + return False + return self.RealType == other.RealType + + # 添加新的优化方法,避免重复检查 + def _init_base_types_if_needed(self): + """初始化基类类型,只在需要时执行""" + if self._BaseTypes is None: + self._BaseTypes = [] + metaType = self.RealType + for baseType in metaType.__bases__: + # 避免循环引用,如果baseType是自己,则跳过 + if baseType == metaType: + continue + self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType)) + + # 确保正确地实现所有GetBase*方法 + @functools.lru_cache(maxsize=128) + def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetFields(flag)) + return result + + @functools.lru_cache(maxsize=128) + def GetAllBaseFields(self) -> List[FieldInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetAllFields()) + return result + + # 修改所有的GetBase*方法 + @functools.lru_cache(maxsize=128) + def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetMethods(flag)) + return result + + @functools.lru_cache(maxsize=128) + def GetAllBaseMethods(self) -> List[MethodInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetAllMethods()) + return result + + @functools.lru_cache(maxsize=128) + def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetMembers(flag)) + return result + + @functools.lru_cache(maxsize=128) + def GetAllBaseMembers(self) -> List[MemberInfo]: + if self._BaseTypes is None: + self._init_base_types_if_needed() + result = [] + for baseType in self._BaseTypes: + result.extend(baseType.GetAllMembers()) + return result + + def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]: + self._ensure_initialized() + if flag == RefTypeFlag.Default: + result = [field for field in self._FieldInfos + if self._where_member(field, RefTypeFlag.Field|RefTypeFlag.Public|RefTypeFlag.Instance)] + else: + result = [field for field in self._FieldInfos if self._where_member(field, flag)] + result.extend(self.GetBaseFields(flag)) + return result + + def GetAllFields(self) -> List[FieldInfo]: + self._ensure_initialized() + result = self._FieldInfos.copy() + result.extend(self.GetAllBaseFields()) + return result + + def GetMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]: + self._ensure_initialized() + if flag == RefTypeFlag.Default: + result = [method for method in self._MethodInfos + if self._where_member(method, RefTypeFlag.Method|RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Static)] + else: + result = [method for method in self._MethodInfos if self._where_member(method, flag)] + result.extend(self.GetBaseMethods(flag)) + return result + + def GetAllMethods(self) -> List[MethodInfo]: + self._ensure_initialized() + result = self._MethodInfos.copy() + result.extend(self.GetAllBaseMethods()) + return result + + def GetMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]: + self._ensure_initialized() + if flag == RefTypeFlag.Default: + result = [member for member in self._FieldInfos + self._MethodInfos + if self._where_member(member, RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Field|RefTypeFlag.Method)] + else: + result = [member for member in self._FieldInfos + self._MethodInfos if self._where_member(member, flag)] + result.extend(self.GetBaseMembers(flag)) + return result + + def GetAllMembers(self) -> List[MemberInfo]: + self._ensure_initialized() + result = self._FieldInfos + self._MethodInfos + result.extend(self.GetAllBaseMembers()) + return result + +type RTypen[_T] = RefType +''' +RTypen[T] 是 T 类型的 RefType +''' + +_Internal_TypeManager:Optional['TypeManager'] = None + +class TypeManager(BaseModel, any_class): + _RefTypes: Dict[type|_SpecialIndictaor, RefType] = PrivateAttr(default_factory=dict) + _is_preheated: bool = PrivateAttr(default=False) + _weak_refs: Dict[int, "weakref.ref[RefType]"] = PrivateAttr(default_factory=dict) # 使用真正的弱引用 + _type_name_cache: Dict[str, type] = PrivateAttr(default_factory=dict) # 类型名称到类型的缓存 + _string_to_type_cache: Dict[str, Any] = PrivateAttr(default_factory=dict) # 字符串到类型的缓存 + + @classmethod + def GetInstance(cls) -> Self: + global _Internal_TypeManager + if _Internal_TypeManager is None: + _Internal_TypeManager = cls() + _Internal_TypeManager._preheat_cache() + return _Internal_TypeManager + + def _preheat_cache(self): + """预热缓存,为常用类型预先创建RefType""" + if self._is_preheated: + return + + # 常用的基础类型列表 + common_types = [ + int, float, str, bool, list, dict, tuple, set, + object, type, None.__class__, Exception, BaseModel, any_class + ] + + # 预加载的类型和它们之间常见的关系,减少后续运行时计算 + for t in common_types: + self.CreateRefType(t) + # 同时缓存类型名称到类型的映射 + self._type_name_cache[t.__name__] = t + # 也缓存类型字符串到类型的映射 + self._string_to_type_cache[t.__name__] = t + # 缓存模块全限定名 + if t.__module__ != 'builtins': + full_name = f"{t.__module__}.{t.__name__}" + self._string_to_type_cache[full_name] = t + + self._is_preheated = True + + def AllRefTypes(self) -> Tuple[RefType, ...]: + return tuple(self._RefTypes.values()) + + @staticmethod + #@functools.lru_cache(maxsize=256) + def _TurnToType(data:Any, module_name:Optional[str]=None) -> type|_SpecialIndictaor: + """将任意数据转换为类型,增加缓存以提高性能""" + metaType:type|_SpecialIndictaor = None + + # 快速路径:如果已经是类型,直接返回 + if isinstance(data, type) or isinstance(data, _SpecialIndictaor): + return data + + # 处理字符串类型 + if isinstance(data, str): + # 尝试使用模块名解析类型 + if module_name is not None: + try: + return sys.modules[module_name].__dict__[data] + except (KeyError, AttributeError): + pass + + # 尝试使用to_type函数 + try: + return to_type(data, module_name=module_name) + except Exception: + pass + + # 尝试使用try_to_type函数作为回退 + metaType = try_to_type(data, module_name=module_name) + if metaType is None or isinstance(metaType, list): + metaType = data + return metaType + + @overload + def CreateOrGetRefType( + self, + type_: type, + module_name: Optional[str] = None + ) -> RefType: + ... + @overload + def CreateOrGetRefType( + self, + obj: object, + module_name: Optional[str] = None + ) -> RefType: + ... + @overload + def CreateOrGetRefType( + self, + type_str: str, + module_name: Optional[str] = None + ) -> RefType: + ... + def CreateOrGetRefType( + self, + data, + module_name: Optional[str] = None + ) -> RefType: + """创建或获取RefType实例,使用多级缓存提高性能""" + if data is None: + raise ReflectionException("data is None") + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}") + + # 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型 + if isinstance(data, str) and data in self._string_to_type_cache: + data = self._string_to_type_cache[data] + + # 获取或转换为类型 + metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name) + + # 首先尝试从弱引用缓存中获取 + type_id = id(metaType) + if type_id in self._weak_refs: + ref_type = self._weak_refs[type_id]() + if ref_type is not None: + return ref_type + else: + # 如果弱引用已被回收,则从字典中删除 + del self._weak_refs[type_id] + + # 然后尝试从常规缓存中获取 + try: + ref_type = self._RefTypes[metaType] + # 添加到弱引用缓存 + self._weak_refs[type_id] = weakref.ref(ref_type) + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.YELLOW, f"Get "\ + f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\ + f"{ConsoleFrontColor.RESET}{ref_type.ToString()}") + return ref_type + except KeyError: + ref_type = self.CreateRefType(metaType, module_name=module_name) + # 添加到弱引用缓存 + self._weak_refs[type_id] = weakref.ref(ref_type) + return ref_type + + @overload + def CreateRefType( + self, + type_: type, + module_name: Optional[str] = None + ) -> RefType: + ... + @overload + def CreateRefType( + self, + obj: object, + module_name: Optional[str] = None + ) -> RefType: + ... + @overload + def CreateRefType( + self, + type_str: str, + module_name: Optional[str] = None + ) -> RefType: + ... + def CreateRefType( + self, + data, + module_name: Optional[str] = None + ) -> RefType: + """创建新的RefType实例""" + if data is None: + raise ReflectionException("data is None") + + # 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型 + if isinstance(data, str) and data in self._string_to_type_cache: + data = self._string_to_type_cache[data] + + metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name) + + # 如果是字符串类型,缓存结果以供将来使用 + if isinstance(data, str) and not data in self._string_to_type_cache: + self._string_to_type_cache[data] = metaType + + try: + ref_type = RefType(metaType) + if GetInternalReflectionDebug(): + print_colorful(ConsoleFrontColor.RED, f"Create "\ + f"{ConsoleFrontColor.RESET}{metaType} "\ + f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}") + self._RefTypes[metaType] = ref_type + return ref_type + except Exception as e: + raise ReflectionException(f"Create RefType failed: {e}") + + def CreateOrGetRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType: + """快速路径:直接从类型创建或获取RefType""" + if type_ in self._RefTypes: + return self._RefTypes[type_] + else: + return self.CreateRefType(type_) + def CreateRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType: + """直接从类型创建RefType""" + result = self._RefTypes[type_] = RefType(type_) + return result + + diff --git a/Convention/Runtime/String.py b/Convention/Runtime/String.py new file mode 100644 index 0000000..e36df9b --- /dev/null +++ b/Convention/Runtime/String.py @@ -0,0 +1,59 @@ +from pathlib import Path +from Config import * +import re +from pathlib import Path +import xml.etree.ElementTree as ET +from xml.dom import minidom +import math + +def LimitStringLength(data, max_length:int=50) -> str: + s:str = data if data is str else str(data) + if len(s) <= max_length: + return s + else: + inside_str = "\n...\n...\n" + # 计算头尾部分的长度 + head_length = max_length // 2 + tail_length = max_length - head_length - len(inside_str) # 3 是省略号的长度 + + # 截取头尾部分并连接 + return s[:head_length] + inside_str + s[-tail_length:] + +def FillString(data:Any, + max_length: int = 50, + fill_char: str = " ", + side: Literal["left", "right", "center"] = "right" + ) -> str: + s:str = data if data is str else str(data) + char = fill_char[0] + if len(s) >= max_length: + return s + else: + if side == "left": + return s + char * (max_length - len(s)) + elif side == "right": + return char * (max_length - len(s)) + s + elif side == "center": + left = (max_length - len(s)) // 2 + right = max_length - len(s) - left + return char * left + s + char * right + else: + raise ValueError(f"Unsupported side: {side}") + +def Bytes2Strings(lines:List[bytes], encoding='utf-8') -> List[str]: + return [line.decode(encoding) for line in lines] + +def Bytes2String(lines:List[bytes], encoding='utf-8') -> str: + return "".join(Bytes2Strings(lines, encoding)) + +def word_segmentation( + sentence: Union[str, light_str, Any], + cut_all: bool = False, + HMM: bool = True, + use_paddle: bool = False + ) -> Sequence[Optional[Union[Any, str]]]: + try: + import jieba + return jieba.dt.cut(UnWrapper(sentence), cut_all=cut_all, HMM=HMM, use_paddle=use_paddle) + except ImportError: + raise ValueError("jieba is not install") diff --git a/[Test]/test.py b/[Test]/test.py new file mode 100644 index 0000000..70cac29 --- /dev/null +++ b/[Test]/test.py @@ -0,0 +1,11 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from Convention.Runtime.Config import * + +def run(): + print_colorful(ConsoleFrontColor.RED,"test") + +if __name__ == "__main__": + run() diff --git a/setup.py b/setup.py index e681960..7b515d2 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,9 @@ setup( python_requires=">=3.12", install_requires=[ "colorama", - "pydantic" + "pydantic", + "python-docx", + "Pillow" ], exclude_package_data={"": ["*.meta"]}, ) \ No newline at end of file