From 39f8320c492f07e9ae27f7f8bfd9bb5123ab4270 Mon Sep 17 00:00:00 2001 From: ninemine <1371605831@qq.com> Date: Wed, 9 Jul 2025 23:52:24 +0800 Subject: [PATCH] =?UTF-8?q?BS=200.1.0=20=E9=87=8D=E5=BB=BA=E5=8F=8D?= =?UTF-8?q?=E5=B0=84=E4=B8=8EEasySave?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Convention/Runtime/Config.py | 1 + Convention/Runtime/EasySave.py | 125 +++--- Convention/Runtime/File.py | 641 +++++++++++-------------------- Convention/Runtime/Reflection.py | 251 +++--------- setup.py | 3 +- 5 files changed, 337 insertions(+), 684 deletions(-) diff --git a/Convention/Runtime/Config.py b/Convention/Runtime/Config.py index 5fcfe88..e15200c 100644 --- a/Convention/Runtime/Config.py +++ b/Convention/Runtime/Config.py @@ -7,6 +7,7 @@ import traceback import datetime import platform import time +import os from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle def format_traceback_info(char:str='\n', back:int=1): diff --git a/Convention/Runtime/EasySave.py b/Convention/Runtime/EasySave.py index b85ed7f..c363a62 100644 --- a/Convention/Runtime/EasySave.py +++ b/Convention/Runtime/EasySave.py @@ -9,42 +9,40 @@ def SetInternalEasySaveDebug(debug:bool) -> None: global _Internal_EasySave_Debug _Internal_EasySave_Debug = debug -class EasySaveSetting(BaseModel, any_class): +class EasySaveSetting(BaseModel): key: str = Field(description="目标键", default="easy") # 从目标文件进行序列化/反序列化 file: str = Field(description="目标文件") # 序列化/反序列化的格式方法 - format: Literal["json", "binary"] = Field(description="保存模式", default="json") + formatMode: Literal["json", "binary"] = Field(description="保存模式", default="json") # TODO: refChain: bool = Field(description="是否以保留引用的方式保存", default=True) # 文件形式与参数 # TODO: encoding: str = Field(description="编码", default="utf-8") - is_backup: bool = Field(description="是否备份", default=True) + isBackup: bool = Field(description="是否备份", default=True) backup_suffix: str = Field(description="备份后缀", default=".backup") # 序列化/反序列化时, 如果设置了忽略字段的谓词, 则被谓词选中的字段将不会工作 # 如果设置了选择字段的谓词, 则被选中的字段才会工作 - ignore_pr: Optional[Callable[[FieldInfo], bool]] = Field(description="忽略字段的谓词", default=None) - select_pr: Optional[Callable[[FieldInfo], bool]] = Field(description="选择字段的谓词", default=None) + ignorePr: Optional[Callable[[FieldInfo], bool]] = Field(description="忽略字段的谓词", default=None) + selectPr: Optional[Callable[[FieldInfo], bool]] = Field(description="选择字段的谓词", default=None) -class ESWriter(BaseModel, any_class): +class ESWriter(BaseModel): setting: EasySaveSetting = Field(description="设置") - @sealed def _GetFields(self, rtype:RefType) -> List[FieldInfo]: ''' 获取字段 ''' fields: List[FieldInfo] = [] - if self.setting.ignore_pr is not None and self.setting.select_pr is not None: - fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) and not self.setting.ignore_pr(field) ] - elif self.setting.select_pr is None and self.setting.ignore_pr is None: + if self.setting.ignorePr is not None and self.setting.selectPr is not None: + fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) and not self.setting.ignorePr(field) ] + elif self.setting.selectPr is None and self.setting.ignorePr is None: fields = rtype.GetFields() - elif self.setting.ignore_pr is not None: - fields = [ field for field in rtype.GetAllFields() if not self.setting.ignore_pr(field) ] + elif self.setting.ignorePr is not None: + fields = [ field for field in rtype.GetAllFields() if not self.setting.ignorePr(field) ] else: - fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) ] + fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) ] return fields - @sealed def _DoJsonSerialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any: ''' 序列化: json格式 @@ -77,7 +75,7 @@ class ESWriter(BaseModel, any_class): raise ReflectionException(f"{ConsoleFrontColor.RED}容器<{rtype.RealType}>"\ f"在序列化时遇到错误:{ConsoleFrontColor.RESET}\n{e}") from e raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {rinstance}"\ - f"<{rtype.print_str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") + f"<{rtype.Print2Str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") elif hasattr(rtype.RealType, "__easy_serialize__"): custom_data, is_need_type = rtype.RealType.__easy_serialize__(rinstance) if is_need_type: @@ -104,85 +102,79 @@ class ESWriter(BaseModel, any_class): return layer layers: Dict[str, Any] = {} - if result_file.exists(): - filedata = result_file.load() + if result_file.Exists(): + filedata = result_file.LoadAsJson() if isinstance(filedata, dict): layers = filedata layers[self.setting.key] = { "__type": AssemblyTypen(rtype.RealType), "value": dfs(rtype, rinstance) } - result_file.data = layers - result_file.save_as_json() + result_file.SaveAsJson(layers) - @sealed def _DoBinarySerialize(self, result_file:ToolFile, rinstance:Any) -> Any: ''' 序列化: 二进制格式 ''' - result_file.data = rinstance - result_file.save_as_binary() + result_file.SaveAsBinary(rinstance) - @virtual def Serialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any: ''' 序列化 ''' - if self.setting.format == "json": + if self.setting.formatMode == "json": self._DoJsonSerialize(result_file, rtype, rinstance) - elif self.setting.format == "binary": + elif self.setting.formatMode == "binary": self._DoBinarySerialize(result_file, rinstance) else: - raise NotImplementedError(f"不支持的格式: {self.setting.format}") + raise NotImplementedError(f"不支持的格式: {self.setting.formatMode}") - @virtual def Write[T](self, rinstance:T) -> ToolFile: ''' 写入数据 ''' result_file: ToolFile = ToolFile(self.setting.file) backup_file: ToolFile = None - if result_file.dirpath is not None and not ToolFile(result_file.dirpath).exists(): - raise FileNotFoundError(f"文件路径不存在: {result_file.dirpath}") - if result_file.exists() and self.setting.is_backup: - if result_file.dirpath is not None: - backup_file = result_file.dirpath | (result_file.get_filename(True) + self.setting.backup_suffix) + if result_file.GetDir() is not None and not ToolFile(result_file.GetDir()).Exists(): + raise FileNotFoundError(f"文件路径不存在: {result_file.GetDir()}") + if result_file.Exists() and self.setting.isBackup: + if result_file.GetDir() is not None: + backup_file = ToolFile(result_file.GetDir()) | (result_file.GetFilename(True) + self.setting.backup_suffix) else: - backup_file = ToolFile(result_file.get_filename(True) + self.setting.backup_suffix) - result_file.copy(backup_file) + backup_file = ToolFile(result_file.GetFilename(True) + self.setting.backup_suffix) + result_file.Copy(backup_file) try: self.Serialize(result_file, TypeManager.GetInstance().CreateOrGetRefType(rinstance), rinstance) except Exception: if backup_file is not None: - result_file.remove() - backup_file.copy(result_file) - backup_file.remove() + result_file.Remove() + backup_file.Copy(result_file) + backup_file.Remove() raise finally: if backup_file is not None: - backup_file.remove() + backup_file.Remove() return result_file -class ESReader(BaseModel, any_class): +class ESReader(BaseModel): setting: EasySaveSetting = Field(description="设置") - @sealed def _GetFields(self, rtype:RefType) -> List[FieldInfo]: ''' 获取字段 ''' fields: List[FieldInfo] = [] - if self.setting.ignore_pr is not None and self.setting.select_pr is not None: - fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) and not self.setting.ignore_pr(field) ] - elif self.setting.select_pr is None and self.setting.ignore_pr is None: + if self.setting.ignorePr is not None and self.setting.selectPr is not None: + fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) and not self.setting.ignorePr(field) ] + elif self.setting.selectPr is None and self.setting.ignorePr is None: fields = rtype.GetFields() - elif self.setting.ignore_pr is not None: - fields = [ field for field in rtype.GetAllFields() if not self.setting.ignore_pr(field) ] + elif self.setting.ignorePr is not None: + fields = [ field for field in rtype.GetAllFields() if not self.setting.ignorePr(field) ] else: - fields = [ field for field in rtype.GetAllFields() if self.setting.select_pr(field) ] + fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) ] return fields - def get_rtype_from_typen(self, type_label:str) -> RefType: + def GetRtypeFromTypen(self, type_label:str) -> RefType: ''' 从类型标签中获取类型 ''' @@ -217,12 +209,12 @@ class ESReader(BaseModel, any_class): ValueError: 当rinstance不为None时抛出 ''' # 从文件中加载JSON数据 - layers: Dict[str, Any] = read_file.load_as_json() + layers: Dict[str, Any] = read_file.LoadAsJson() if self.setting.key not in layers: raise ValueError(f"{ConsoleFrontColor.RED}文件中不包含目标键: {ConsoleFrontColor.RESET}{self.setting.key}") # 如果未指定类型, 则从JSON数据中获取类型信息 if rtype is None: - rtype: RefType = self.get_rtype_from_typen(layers["__type"]) + rtype: RefType = self.GetRtypeFromTypen(layers["__type"]) layers: Dict[str, Any] = layers[self.setting.key]["value"] result_instance: Any = None @@ -240,7 +232,7 @@ class ESReader(BaseModel, any_class): ''' # 如果类型为None且当前层包含类型信息, 则获取类型 if isinstance(layer, dict) and "__type" in layer: - rtype = self.get_rtype_from_typen(layer["__type"]) + rtype = self.GetRtypeFromTypen(layer["__type"]) if rtype is None: raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}") if GetInternalEasySaveDebug(): @@ -280,7 +272,7 @@ class ESReader(BaseModel, any_class): except Exception as e: raise ReflectionException(f"容器<{LimitStringLength(str(layer), 100)}>在反序列化时遇到错误:\n{e}") from e raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {LimitStringLength(str(layer), 100)}"\ - f"<{rtype.print_str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") + f"<{rtype.Print2Str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}") # 处理对象类型 elif isinstance(rtype.RealType, type) and hasattr(rtype.RealType, "__easy_deserialize__"): return rtype.RealType.__easy_deserialize__(layer) @@ -288,7 +280,7 @@ class ESReader(BaseModel, any_class): rinstance = rtype.CreateInstance() if GetInternalEasySaveDebug(): print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\ - f"{rtype.print_str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") + f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") fields:List[FieldInfo] = self._GetFields(rtype) for field in fields: if field.FieldName not in layer: @@ -336,50 +328,47 @@ class ESReader(BaseModel, any_class): result_instance = dfs(rtype, layers) return result_instance - @sealed def _DoBinaryDeserialize(self, read_file:ToolFile, rtype:RefType) -> Any: ''' 反序列化: 二进制格式 ''' - return read_file.load_as_binary() + return read_file.LoadAsBinary() - @virtual def Deserialize(self, read_file:ToolFile, rtype:Optional[RefType]=None) -> Any: ''' 反序列化 ''' - if self.setting.format == "json": + if self.setting.formatMode == "json": return self._DoJsonDeserialize(read_file, rtype) - elif self.setting.format == "binary": + elif self.setting.formatMode == "binary": return self._DoBinaryDeserialize(read_file, rtype) else: - raise NotImplementedError(f"不支持的格式: {self.setting.format}") + raise NotImplementedError(f"不支持的格式: {self.setting.formatMode}") - @virtual def Read[T](self, rtype:Optional[RTypen[T]]=None) -> T: ''' 读取数据 ''' read_file: ToolFile = ToolFile(self.setting.file) - if not read_file.exists(): + if not read_file.Exists(): raise FileNotFoundError(f"文件不存在: {read_file}") - if read_file.is_dir(): + if read_file.IsDir(): raise IsADirectoryError(f"文件是目录: {read_file}") return self.Deserialize(read_file, rtype) -class EasySave(any_class): +class EasySave: @staticmethod - def Write[T](rinstance:T, file:tool_file_or_str=None, *, setting:Optional[EasySaveSetting]=None) -> ToolFile: + def Write[T](rinstance:T, file:Optional[ToolFile|str]=None, *, setting:Optional[EasySaveSetting]=None) -> ToolFile: ''' 写入数据 ''' - return ESWriter(setting=(setting if setting is not None else EasySaveSetting(file=UnwrapperFile2Str(file)))).Write(rinstance) + return ESWriter(setting=(setting if setting is not None else EasySaveSetting(file=str(file)))).Write(rinstance) @overload @staticmethod def Read[T]( rtype: Typen[T], - file: tool_file_or_str = None, + file: Optional[ToolFile|str] = None, *, setting: Optional[EasySaveSetting] = None ) -> T: @@ -388,7 +377,7 @@ class EasySave(any_class): @staticmethod def Read[T]( rtype: RTypen[T], - file: tool_file_or_str = None, + file: Optional[ToolFile|str] = None, *, setting: Optional[EasySaveSetting] = None ) -> T: @@ -396,7 +385,7 @@ class EasySave(any_class): @staticmethod def Read[T]( rtype: RTypen[T]|type, - file: tool_file_or_str = None, + file: Optional[ToolFile|str] = None, *, setting: Optional[EasySaveSetting] = None ) -> T: @@ -405,4 +394,4 @@ class EasySave(any_class): ''' if isinstance(rtype, type): rtype = TypeManager.GetInstance().CreateOrGetRefType(rtype) - return ESReader(setting=(setting if setting is not None else EasySaveSetting(file=UnwrapperFile2Str(file)))).Read(rtype) + return ESReader(setting=(setting if setting is not None else EasySaveSetting(file=str(file)))).Read(rtype) diff --git a/Convention/Runtime/File.py b/Convention/Runtime/File.py index 07f0cb9..b2f7ef9 100644 --- a/Convention/Runtime/File.py +++ b/Convention/Runtime/File.py @@ -16,24 +16,24 @@ from typing import * from pathlib import Path try: from pydub import AudioSegment -except ImportError: - ImportingThrow("File", ["pydub"]) +except ImportError as e: + ImportingThrow(e, "File", ["pydub"]) try: from PIL import Image, ImageFile -except ImportError: - ImportingThrow("File", ["Pillow"]) +except ImportError as e: + ImportingThrow(e, "File", ["Pillow"]) try: from docx import Document from docx.document import Document as DocumentObject -except ImportError: - ImportingThrow("File", ["python-docx"]) +except ImportError as e: + ImportingThrow(e, "File", ["python-docx"]) from .String import Bytes2String -def get_extension_name(file:str): +def GetExtensionName(file:str): return os.path.splitext(file)[1][1:] -def get_base_filename(file:str): +def GetBaseFilename(file:str): return os.path.basename(file) dir_name_type = str @@ -67,15 +67,8 @@ class PermissionError(FileOperationError): """权限操作异常""" pass -try: - from pydantic import BaseModel, GetCoreSchemaHandler - from pydantic_core import core_schema -except ImportError: - class BaseModel: - def __init__(self,*args,**kwargs)->None: - pass - type GetCoreSchemaHandler = Any - type core_schema = Any +from pydantic import BaseModel, GetCoreSchemaHandler +from pydantic_core import core_schema class ToolFile(BaseModel): OriginFullPath:str @@ -101,42 +94,20 @@ class ToolFile(BaseModel): ): self.OriginFullPath = filePath if isinstance(filePath, str) else filePath.OriginFullPath def __del__(self): - self.close() + pass def __str__(self): return self.GetFullPath() - def __setitem__(self, key:str, value): - self.datas[key] = value - def __getitem__(self, key:str): - if key not in self.datas: - self.datas[key] = None - return self.datas[key] - def __contains__(self, key:str): - return key in self.datas - def __delitem__(self, key:str): - del self.datas[key] - def __iter__(self): - return iter(self.datas) - def __len__(self): - return len(self.datas) - @override def __enter__(self): - if self.is_open(): - return self - if self.exists() and self.is_file(): - self.load() return self - @override def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return super().__exit__(exc_type, exc_val, exc_tb) + return True def __or__(self, other): if other is None: - return ToolFile(self.GetFullPath() if self.is_dir() else self.GetFullPath()+"\\") + return ToolFile(self.GetFullPath() if self.IsDir() else self.GetFullPath()+"\\") else: - return ToolFile(os.path.join(self.GetFullPath(), UnWrapper(other))) + return ToolFile(os.path.join(self.GetFullPath(), str(other))) def __idiv__(self, other): - self.close() temp = self.__or__(other) self.OriginFullPath = temp.GetFullPath() @@ -168,213 +139,131 @@ class ToolFile(BaseModel): return os.path.normpath(self_path) == os.path.normpath(other_path) - def to_path(self): + def ToPath(self): return Path(self.OriginFullPath) def __Path__(self): return Path(self.OriginFullPath) - def write(self, data:Union[str, bytes]): - self.__file.write(data) - - def create(self): - if self.exists() == False: - if self.is_dir(): - if os.path.exists(self.get_dir()): + def Create(self): + if self.Exists() == False: + if self.IsDir(): + if os.path.exists(self.GetDir()): os.makedirs(self.OriginFullPath) else: raise FileNotFoundError(f"{self.OriginFullPath} cannt create, because its parent path is not exist") else: - self.open('w') - self.close() + with open(self.OriginFullPath, 'w') as f: + f.write('') return self - def exists(self): + def Exists(self): return os.path.exists(self.OriginFullPath) - def remove(self): - self.close() - if self.exists(): - if self.is_dir(): + def Remove(self): + if self.Exists(): + if self.IsDir(): shutil.rmtree(self.OriginFullPath) else: os.remove(self.OriginFullPath) return self - def copy(self, to_path:Optional[Union[Self, str]]=None): - if to_path is None: + def Copy(self, targetPath:Optional[Union[Self, str]]=None): + if targetPath is None: return ToolFile(self.OriginFullPath) - if self.exists() is False: + if self.Exists() is False: raise FileNotFoundError("file not found") - self.close() - target_file = ToolFile(UnWrapper(to_path)) - if target_file.is_dir(): + target_file = ToolFile(str(targetPath)) + if target_file.IsDir(): target_file = target_file|self.GetFilename() - shutil.copy(self.OriginFullPath, UnWrapper(target_file)) + shutil.copy(self.OriginFullPath, str(target_file)) return target_file - def move(self, to_path:Union[Self, str]): - if self.exists() is False: + def Move(self, targetPath:Union[Self, str]): + if self.Exists() is False: raise FileNotFoundError("file not found") - self.close() - target_file = ToolFile(UnWrapper(to_path)) - if target_file.is_dir(): + target_file = ToolFile(str(targetPath)) + if target_file.IsDir(): target_file = target_file|self.GetFilename() - shutil.move(self.OriginFullPath, UnWrapper(target_file)) + shutil.move(self.OriginFullPath, str(target_file)) self.OriginFullPath = target_file.OriginFullPath return self - def rename(self, newpath:Union[Self, str]): - if self.exists() is False: + def Rename(self, newpath:Union[Self, str]): + if self.Exists() is False: raise FileNotFoundError("file not found") - self.close() - newpath:str = UnWrapper(newpath) + newpath = str(newpath) if '\\' in newpath or '/' in newpath: - newpath = get_base_filename(newpath) - new_current_path = os.path.join(self.get_dir(), newpath) + newpath = GetBaseFilename(newpath) + new_current_path = os.path.join(self.GetDir(), newpath) os.rename(self.OriginFullPath, new_current_path) self.OriginFullPath = new_current_path return self - def refresh(self): - self.load() - return self - def open(self, mode='r', is_refresh=False, encoding:str='utf-8', *args, **kwargs): - self.close() - if 'b' in mode: - self.__file = open(self.OriginFullPath, mode, *args, **kwargs) + def LoadAsJson(self) -> Any: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + json_data = json.load(f) + return json_data else: - self.__file = open(self.OriginFullPath, mode, encoding=encoding, *args, **kwargs) - if is_refresh: - self.refresh() - return self.__file - def close(self): - if self.__file: - self.__file.close() - if self.__datas_lit_key in self.datas: - self.datas[self.__datas_lit_key] = None - return self.__file - def is_open(self)->bool: - return self.__file is not None + raise FileNotFoundError("file not found") - def load(self): - if self.OriginFullPath is None: - if os.path.exists(temp_tool_file_path_name): - self.data = pickle.load(open(temp_tool_file_path_name, 'rb')) - return self.data - else: - raise FileNotFoundError(f"{self.OriginFullPath} not found, but this ToolFile's target is None") - elif self.is_dir(): - self.__file = open(os.path.join(self.GetFullPath(), temp_tool_file_path_name), 'rb') - self.data = pickle.load(self.__file) - return self.data - suffix = self.get_extension() - if suffix == 'json': - self.load_as_json() - elif suffix == 'csv': - self.load_as_csv() - elif suffix == 'xml': - self.load_as_xml() - elif suffix == 'xlsx' or suffix == 'xls': - self.load_as_excel() - elif suffix in text_readable_file_type: - self.load_as_text() - elif suffix == 'docx' or suffix == 'doc': - self.load_as_docx() - elif suffix in audio_file_type: - self.load_as_audio() - elif is_image_file(self.OriginFullPath): - self.load_as_image() - elif is_binary_file(self.OriginFullPath): - self.load_as_binary() + def LoadAsCsv(self) -> pd.DataFrame: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + return pd.read_csv(f) else: - self.load_as_unknown(suffix) - return self.data - def load_as_json(self) -> pd.DataFrame: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - json_data = json.load(self.__file) - #try: - # from pydantic import BaseModel - # if "__type" in json_data and "pydantic.BaseModel" in json_data["__type"]: - # del json_data["__type"] - # json_data = BaseModel.model_validate(json_data) - #except: - # pass - self.data = json_data - return self.data - def load_as_csv(self) -> pd.DataFrame: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - self.data = pd.read_csv(self.__file) - return self.data - def load_as_xml(self) -> pd.DataFrame: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - self.data = pd.read_xml(self.__file) - return self.data - def load_as_dataframe(self) -> pd.DataFrame: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - self.data = pd.read_csv(self.__file) - return self.data - def load_as_excel(self) -> pd.DataFrame: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - self.data = pd.read_excel(self.__file) - return self.data - def load_as_binary(self) -> bytes: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('rb') - self.data = self.__file.read() - return self.data - def load_as_text(self) -> str: - if self.is_open() is False or 'w' in self.__file.mode: - self.open('r') - self.data = list_byte_to_string(self.__file.readlines()) - return self.data - def load_as_wav(self): - self.data = AudioSegment.from_wav(self.OriginFullPath) - return self.data - def load_as_audio(self): - self.data = AudioSegment.from_file(self.OriginFullPath) - return self.data - def load_as_image(self) -> ImageFile.ImageFile: - self.data = Image.open(self.OriginFullPath) - return self.data - def load_as_docx(self) -> DocumentObject: - self.data = Document(self.OriginFullPath) - return self.data - def load_as_unknown(self, suffix:str) -> Any: - return self.load_as_text() - def load_as_model(self, model:type[BaseModel]) -> BaseModel: - return model.model_validate(self.load_as_json()) + raise FileNotFoundError("file not found") + def LoadAsXml(self) -> pd.DataFrame: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + return pd.read_xml(f) + else: + raise FileNotFoundError("file not found") + def LoadAsDataframe(self) -> pd.DataFrame: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + return pd.read_csv(f) + else: + raise FileNotFoundError("file not found") + def LoadAsExcel(self) -> pd.DataFrame: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + return pd.read_excel(f) + else: + raise FileNotFoundError("file not found") + def LoadAsBinary(self) -> bytes: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'rb') as f: + return f.read() + else: + raise FileNotFoundError("file not found") + def LoadAsText(self) -> str: + if self.Exists() is False or 'w' in self.OriginFullPath: + with open(self.OriginFullPath, 'r') as f: + return f.read() + else: + raise FileNotFoundError("file not found") + def LoadAsWav(self): + if self.Exists() is False or 'w' in self.OriginFullPath: + return AudioSegment.from_wav(self.OriginFullPath) + else: + raise FileNotFoundError("file not found") + def LoadAsAudio(self): + if self.Exists() is False or 'w' in self.OriginFullPath: + return AudioSegment.from_file(self.OriginFullPath) + else: + raise FileNotFoundError("file not found") + def LoadAsImage(self) -> ImageFile.ImageFile: + if self.Exists() is False or 'w' in self.OriginFullPath: + return Image.open(self.OriginFullPath) + else: + raise FileNotFoundError("file not found") + def LoadAsDocx(self) -> DocumentObject: + if self.Exists() is False or 'w' in self.OriginFullPath: + return Document(self.OriginFullPath) + else: + raise FileNotFoundError("file not found") + def LoadAsUnknown(self, suffix:str) -> Any: + return self.LoadAsText() + def LoadAsModel(self, model:type[BaseModel]) -> BaseModel: + return model.model_validate(self.LoadAsJson()) - def save(self, path:Optional[str]=None): - if path is None and self.OriginFullPath is None: - raise Exception('No file path specified') - elif path is None and self.is_dir(): - with open(os.path.join(self.OriginFullPath, temp_tool_file_path_name),'wb') as temp_file: - pickle.dump(self.data, temp_file) - return self - suffix = self.get_extension(path) - if suffix == 'json': - self.save_as_json(path) - elif suffix == 'csv': - self.save_as_csv(path) - elif suffix == 'xml': - self.save_as_xml(path) - elif suffix == 'xlsx' or suffix == 'xls': - self.save_as_excel(path) - elif suffix in text_readable_file_type: - self.save_as_text(path) - elif suffix == 'docx': - self.save_as_docx(path) - elif suffix in audio_file_type: - self.save_as_audio(path, suffix) - elif is_binary_file(self.OriginFullPath): - self.save_as_binary(path) - elif is_image_file(self.OriginFullPath): - self.save_as_image(path) - else: - self.save_as_unknown(path) - return self - def save_as_json(self, path:Optional[str]=None): - json_data = self.data + def SaveAsJson(self, json_data): try: from pydantic import BaseModel if isinstance(json_data, BaseModel): @@ -382,89 +271,51 @@ class ToolFile(BaseModel): json_data["__type"] = f"{self.data.__class__.__name__}, pydantic.BaseModel" except: pass - path = path if path is not None else self.OriginFullPath - self.close() - with open(path, 'w', encoding='utf-8') as f: + with open(self.OriginFullPath, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=4) return self - def save_as_csv(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.to_csv(path) + def SaveAsCsv(self, csv_data:pd.DataFrame): + csv_data.to_csv(self.OriginFullPath) return self - def save_as_xml(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.to_xml(path) + def SaveAsXml(self, xml_data:pd.DataFrame): + xml_data.to_xml(self.OriginFullPath) return self - def save_as_dataframe(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.to_csv(path) + def SaveAsDataframe(self, dataframe_data:pd.DataFrame): + dataframe_data.to_csv(self.OriginFullPath) return self - def save_as_excel(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.to_excel(path, index=False) + def SaveAsExcel(self, excel_data:pd.DataFrame): + excel_data.to_excel(self.OriginFullPath, index=False) return self - def save_as_binary(self, path:Optional[str]=None): - if path is not None: - with open(path, 'wb') as f: - f.write(self.data) - f.flush() - else: - if self.is_open() is False or 'r' in self.__file.mode: - self.open('wb') - self.__file.write(self.data) - self.__file.flush() + def SaveAsBinary(self, binary_data:bytes): + with open(self.OriginFullPath, 'wb') as f: + f.write(binary_data) return self - def save_as_text(self, path:Optional[str]=None): - if path is not None: - with open(path, 'w') as f: - f.writelines(self.data) - f.flush() - else: - if self.is_open() is False or 'r' in self.__file.mode: - self.open('w') - self.__file.writelines(self.data) - self.__file.flush() + def SaveAsText(self, text_data:str): + with open(self.OriginFullPath, 'w') as f: + f.writelines(text_data) return self - def save_as_audio(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.export(path, format=self.get_extension(path)) + def SaveAsAudio(self, audio_data:AudioSegment): + audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath)) return self - def save_as_image(self, path:Optional[str]=None): - path = path if path is not None else self.OriginFullPath - self.data.save(path) + def SaveAsImage(self, image_data:ImageFile.ImageFile): + image_data.save(self.OriginFullPath) return self - def save_as_docx(self, path:Optional[str]=None): - if self.data is str: - self.data = Document() - table = self.data.add_table(rows=1, cols=1) - table.cell(0, 0).text = self.data - path = path if path is not None else self.OriginFullPath - self.data.save(path) + def SaveAsDocx(self, docx_data:DocumentObject): + docx_data.save(self.OriginFullPath) return self - def save_as_unknown(self, path:Optional[str]=None): - self.save_as_text(path) - def save_as_model(self, model:type[BaseModel], path:Optional[str]=None): - self.save_as_json(path) + def SaveAsUnknown(self, unknown_data:Any): + self.SaveAsBinary(unknown_data) + def SaveAsModel(self, model:type[BaseModel]): + self.SaveAsJson(model) - def get_size(self) -> int: + def GetSize(self) -> int: ''' return: return size of directory ''' return os.path.getsize(self.OriginFullPath) - def get_data_type(self) -> type: - return type(self.data) - def has_data_type_is(self, types:Union[type, Sequence[type]]) -> bool: - if isinstance(types, Sequence) is False: - return self.get_data_type() == types - return self.get_data_type() in types - def get_extension(self, path:str=None): - if self.is_dir() and path is None: - raise Exception("Cannot get extension of a directory") - path = path if path is not None else self.OriginFullPath - if path is None: - raise Exception("Cannot get extension without target path") - return get_extension_name(path) + def GetExtension(self): + return GetExtensionName(self.OriginFullPath) def GetFullPath(self) -> str: return self.OriginFullPath def GetFilename(self, is_without_extension = False): @@ -473,90 +324,85 @@ class ToolFile(BaseModel): if target path is a directory, it return top directory name ''' if is_without_extension and '.' in self.OriginFullPath: - return get_base_filename(self.OriginFullPath)[:-(len(self.get_extension())+1)] + return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)] elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': - return get_base_filename(self.OriginFullPath[:-1]) + return GetBaseFilename(self.OriginFullPath[:-1]) else: - return get_base_filename(self.OriginFullPath) - def get_dir(self): + return GetBaseFilename(self.OriginFullPath) + def GetDir(self): return os.path.dirname(self.OriginFullPath) - def get_dir_tool_file(self): - return ToolFile(self.get_dir()) - def get_current_dir_name(self): + def GetDirToolFile(self): + return ToolFile(self.GetDir()) + def GetCurrentDirName(self): return os.path.dirname(self.OriginFullPath) - def is_dir(self): + def IsDir(self): if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': return True else: return os.path.isdir(self.OriginFullPath) - def is_file(self): + def IsFile(self): return os.path.isfile(self.OriginFullPath) - def is_binary_file(self): - return is_binary_file(self.__file) - def is_image(self): - return is_image_file(self.OriginFullPath) - def try_create_parent_path(self): + def TryCreateParentPath(self): dir_path = os.path.dirname(self.OriginFullPath) if dir_path == '': return self if not os.path.exists(dir_path): os.makedirs(dir_path) return self - def dir_iter(self): + def DirIter(self): return os.listdir(self.OriginFullPath) - def dir_tool_file_iter(self): + def DirToolFileIter(self): result = [self] result.clear() for file in os.listdir(self.OriginFullPath): result.append(self|file) return result - def back_to_parent_dir(self): - self.close() - self.OriginFullPath = self.get_dir() + def BackToParentDir(self): + self.OriginFullPath = self.GetDir() return self - def get_parent_dir(self): - return ToolFile(self.get_dir()) - def dir_count(self, ignore_folder:bool = True): - iter = self.dir_iter() + def GetParentDir(self): + return ToolFile(self.GetDir()) + def DirCount(self, ignore_folder:bool = True): + iter = self.DirIter() result = 0 for content in iter: if ignore_folder and os.path.isdir(os.path.join(self.OriginFullPath, content)): continue result += 1 return result - def dir_clear(self): - for file in self.dir_tool_file_iter(): - file.remove() + def DirClear(self): + for file in self.DirToolFileIter(): + file.Remove() return self - def first_file_with_extension(self, extension:str): - target_dir = self if self.is_dir() else ToolFile(self.get_dir()) - for file in target_dir.dir_tool_file_iter(): - if file.is_dir() is False and file.get_extension() == extension: + def FirstFileWithExtension(self, extension:str): + target_dir = self if self.IsDir() else ToolFile(self.GetDir()) + for file in target_dir.DirToolFileIter(): + if file.IsDir() is False and file.GetExtension() == extension: return file return None - def first_file(self, pr:Callable[[str], bool]): - target_dir = self if self.is_dir() else ToolFile(self.get_dir()) - for file in target_dir.dir_tool_file_iter(): + def FirstFile(self, pr:Callable[[str], bool]): + target_dir = self if self.IsDir() else ToolFile(self.GetDir()) + for file in target_dir.DirToolFileIter(): if pr(file.GetFilename()): return file return None - def find_file_with_extension(self, extension:str): - target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + def FindFileWithExtension(self, extension:str): + target_dir = self if self.IsDir() else ToolFile(self.GetDir()) result:List[ToolFile] = [] - for file in target_dir.dir_tool_file_iter(): - if file.is_dir() is False and file.get_extension() == extension: + for file in target_dir.DirToolFileIter(): + if file.IsDir() is False and file.GetExtension() == extension: result.append(file) return result - def find_file(self, pr:Callable[[str], bool]): - target_dir = self if self.is_dir() else ToolFile(self.get_dir()) + def FindFile(self, pr:Callable[[str], bool]): + target_dir = self if self.IsDir() else ToolFile(self.GetDir()) result:List[ToolFile] = [] - for file in target_dir.dir_tool_file_iter(): + for file in target_dir.DirToolFileIter(): if pr(file.GetFilename()): result.append(file) return result - def dir_walk( + def DirWalk( self, top, topdown: bool = True, @@ -565,77 +411,28 @@ class ToolFile(BaseModel): ) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]: return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks) - def append_text(self, line:str): - if self.has_data_type_is(type(str)): - self.data += line - elif self.has_data_type_is(type(DocumentObject)): - self.data.add_paragraph(line) - else: - raise TypeError(f"Unsupported data type for {sys._getframe().f_code.co_name}") - return self def bool(self): - return self.exists() + return self.Exists() def __bool__(self): - return self.exists() + return self.Exists() - def must_exists_path(self): - self.close() - self.try_create_parent_path() - self.create() + def MustExistsPath(self): + self.TryCreateParentPath() + self.Create() return self - def make_file_inside(self, data:Self, is_delete_source = False): - if self.is_dir() is False: + def MakeFileInside(self, data:Self, is_delete_source = False): + if self.IsDir() is False: raise Exception("Cannot make file inside a file, because this object target is not a directory") - result = self|data.GetFilename() + result:ToolFile = self|data.GetFilename() if is_delete_source: - data.move(result) + data.Move(result) else: - data.copy(result) + data.Copy(result) return self - @property - def extension(self): - if self.is_dir(): - return None - return self.get_extension() - @property - def filename(self): - if self.is_dir(): - return None - return self.GetFilename(True) - @property - def dirname(self): - if self.is_dir(): - return self.get_current_dir_name() - return None - @property - def dirpath(self): - if self.is_dir(): - return self.get_current_dir_name() - return None - @property - def shortname(self): - return self.GetFilename(False) - @property - def fullpath(self): - return self.GetFullPath() - - def make_lib_path(self) -> Path: - return Path(self.OriginFullPath) - - def in_extensions(self, *args:str) -> bool: - return self.get_extension() in args - - @override - def SymbolName(self): - return f"ToolFile<{self.GetFullPath()}>" - @override - def ToString(self): - return self.GetFullPath() - - def compress(self, output_path: Optional[str] = None, format: str = 'zip') -> Self: + def Compress(self, output_path: Optional[str] = None, format: str = 'zip') -> 'ToolFile': """ 压缩文件或目录 Args: @@ -644,7 +441,7 @@ class ToolFile(BaseModel): Returns: 压缩后的文件对象 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") if output_path is None: @@ -653,7 +450,7 @@ class ToolFile(BaseModel): try: if format == 'zip': with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - if self.is_dir(): + if self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) @@ -663,7 +460,7 @@ class ToolFile(BaseModel): zipf.write(self.GetFullPath(), self.GetFilename()) elif format == 'tar': with tarfile.open(output_path, 'w') as tarf: - if self.is_dir(): + if self.IsDir(): tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: tarf.add(self.GetFullPath(), arcname=self.GetFilename()) @@ -674,7 +471,7 @@ class ToolFile(BaseModel): except Exception as e: raise CompressionError(f"Compression failed: {str(e)}") - def decompress(self, output_path: Optional[str] = None) -> Self: + def Decompress(self, output_path: Optional[str] = None) -> 'ToolFile': """ 解压文件 Args: @@ -682,27 +479,27 @@ class ToolFile(BaseModel): Returns: 解压后的目录对象 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") if output_path is None: output_path = self.GetFullPath() + '_extracted' try: - if self.get_extension() == 'zip': + if self.GetExtension() == 'zip': with zipfile.ZipFile(self.GetFullPath(), 'r') as zipf: zipf.extractall(output_path) - elif self.get_extension() == 'tar': + elif self.GetExtension() == 'tar': with tarfile.open(self.GetFullPath(), 'r') as tarf: tarf.extractall(output_path) else: - raise CompressionError(f"Unsupported archive format: {self.get_extension()}") + raise CompressionError(f"Unsupported archive format: {self.GetExtension()}") return ToolFile(output_path) except Exception as e: raise CompressionError(f"Decompression failed: {str(e)}") - def encrypt(self, key: str, algorithm: str = 'AES') -> Self: + def Encrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile': """ 加密文件 Args: @@ -714,7 +511,7 @@ class ToolFile(BaseModel): from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -759,7 +556,7 @@ class ToolFile(BaseModel): from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -804,7 +601,7 @@ class ToolFile(BaseModel): Returns: 文件的哈希值(十六进制字符串) """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -834,7 +631,7 @@ class ToolFile(BaseModel): Returns: 是否匹配 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -852,7 +649,7 @@ class ToolFile(BaseModel): Returns: 哈希值文件对象 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -891,7 +688,7 @@ class ToolFile(BaseModel): """ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -972,15 +769,15 @@ class ToolFile(BaseModel): Returns: 备份文件对象 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 生成备份目录 if backup_dir is None: - backup_dir = os.path.join(self.get_dir(), '.backup') + backup_dir = os.path.join(self.GetDir(), '.backup') backup_dir:Self = ToolFile(backup_dir) - backup_dir.must_exists_path() + backup_dir.MustExistsPath() # 生成备份文件名 timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') @@ -990,7 +787,7 @@ class ToolFile(BaseModel): if backup_format == 'zip': backup_path = backup_dir | f"{backup_name}.zip" with zipfile.ZipFile(backup_path.GetFullPath(), 'w', zipfile.ZIP_DEFLATED) as zipf: - if self.is_dir(): + if self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) @@ -1001,7 +798,7 @@ class ToolFile(BaseModel): elif backup_format == 'tar': backup_path = backup_dir | f"{backup_name}.tar" with tarfile.open(backup_path.GetFullPath(), 'w') as tarf: - if self.is_dir(): + if self.IsDir(): tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: tarf.add(self.GetFullPath(), arcname=self.GetFilename()) @@ -1014,7 +811,7 @@ class ToolFile(BaseModel): 'original_path': self.GetFullPath(), 'backup_time': timestamp, 'file_size': self.get_size(), - 'is_directory': self.is_dir(), + 'is_directory': self.IsDir(), 'hash': self.calculate_hash() } metadata_path = backup_dir | f"{backup_name}.meta.json" @@ -1026,7 +823,7 @@ class ToolFile(BaseModel): backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) backups.sort(key=lambda f: f.GetFilename(), reverse=True) for old_backup in backups[max_backups:]: - old_backup.remove() + old_backup.Remove() return backup_path @@ -1051,7 +848,7 @@ class ToolFile(BaseModel): if not isinstance(backup_file, ToolFile): backup_file:Self = ToolFile(backup_file) - if not backup_file.exists(): + if not backup_file.Exists(): raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}") try: @@ -1091,12 +888,12 @@ class ToolFile(BaseModel): Returns: 备份文件列表 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: - backup_dir:Self = ToolFile(os.path.join(self.get_dir(), '.backup')) - if not backup_dir.exists(): + backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup')) + if not backup_dir.Exists(): return [] backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) @@ -1116,7 +913,7 @@ class ToolFile(BaseModel): - execute: 是否可执行 - hidden: 是否隐藏 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -1149,7 +946,7 @@ class ToolFile(BaseModel): Returns: 文件对象本身 """ - if not self.exists(): + if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: @@ -1189,13 +986,13 @@ class ToolFile(BaseModel): else: # Unix/Linux/Mac if hidden: if not self.GetFilename().startswith('.'): - self.rename('.' + self.GetFilename()) + self.Rename('.' + self.GetFilename()) else: if self.GetFilename().startswith('.'): - self.rename(self.GetFilename()[1:]) + self.Rename(self.GetFilename()[1:]) # 递归设置目录权限 - if recursive and self.is_dir(): + if recursive and self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) @@ -1252,12 +1049,6 @@ class ToolFile(BaseModel): """ return self.get_permissions()['hidden'] -def WrapperFile(file) -> ToolFile: - if isinstance(file, ToolFile): - return file - else: - return ToolFile(UnWrapper(file)) - def split_elements( file: Union[ToolFile, str], *, @@ -1268,8 +1059,6 @@ def split_elements( output_must_exist: bool = True, output_callback: Optional[Callable[[ToolFile], None]] = None ) -> List[List[ToolFile]]: - if is_loss_tool_file(file): - return [] result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(), ratios=ratios, pr=pr, @@ -1278,12 +1067,12 @@ def split_elements( return result for i in range(min(len(output_dirs), len(result))): output_dir: ToolFile = output_dirs[i] - if output_dir.is_dir() is False: + if output_dir.IsDir() is False: raise Exception("Outputs must be directory") if output_must_exist: output_dir.must_exists_as_new() for file in result[i]: - current = output_dirs[i].make_file_inside(file) + current = output_dirs[i].MakeFileInside(file) if output_callback: output_callback(current) diff --git a/Convention/Runtime/Reflection.py b/Convention/Runtime/Reflection.py index b7b3f2d..18af879 100644 --- a/Convention/Runtime/Reflection.py +++ b/Convention/Runtime/Reflection.py @@ -39,7 +39,7 @@ class ReflectionException(Exception): self.message = f"{ConsoleFrontColor.RED}{message}{ConsoleFrontColor.RESET}" super().__init__(self.message) -def get_type_from_string(type_string:str) -> type: +def String2Type(type_string:str) -> type: """ 根据字符串生成类型,使用缓存提高性能 """ @@ -84,11 +84,11 @@ def get_type_from_string(type_string:str) -> type: # 更新缓存 if result is not None: _type_string_cache[type_string] = result - - return result + return result + raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>") @functools.lru_cache(maxsize=256) -def get_type_from_string_with_module(type_string:str, module_name:str) -> type|None: +def StringWithModel2Type(type_string:str, module_name:str) -> type|None: ''' 根据字符串生成类型,带模块名参数,使用缓存 ''' @@ -111,7 +111,7 @@ def get_type_from_string_with_module(type_string:str, module_name:str) -> type|N return None # 获取泛型参数 -def get_generic_args(type_hint: type | Any) -> tuple[type | None, tuple[type, ...] | None]: +def GetGenericArgs(type_hint: type | Any) -> tuple[type | None, tuple[type, ...] | None]: origin = get_origin(type_hint) # 获取原始类型 args = get_args(type_hint) # 获取泛型参数 @@ -119,7 +119,7 @@ def get_generic_args(type_hint: type | Any) -> tuple[type | None, tuple[type, .. return None, None return origin, args -def is_generic(type_hint: type | Any) -> bool: +def IsGeneric(type_hint: type | Any) -> bool: return "__origin__" in dir(type_hint) class _SpecialIndictaor: @@ -134,9 +134,6 @@ class ListIndictaor(_SpecialIndictaor): return f"ListIndictaor" def __str__(self) -> str: return self.__repr__() - @override - def ToString(self) -> str: - return self.__repr__() def __hash__(self) -> int: return hash(List[self.elementType]) @@ -152,9 +149,6 @@ class DictIndictaor(_SpecialIndictaor): return f"DictIndictaor" def __str__(self) -> str: return self.__repr__() - @override - def ToString(self) -> str: - return self.__repr__() def __hash__(self) -> int: return hash(Dict[self.keyType, self.valueType]) @@ -168,9 +162,6 @@ class TupleIndictaor(_SpecialIndictaor): return f"TupleIndictaor<{', '.join(map(str, self.elementTypes))}>" def __str__(self) -> str: return self.__repr__() - @override - def ToString(self) -> str: - return self.__repr__() def __hash__(self) -> int: return hash(Tuple[self.elementTypes]) @@ -184,9 +175,6 @@ class SetIndictaor(_SpecialIndictaor): return f"SetIndictaor" def __str__(self) -> str: return self.__repr__() - @override - def ToString(self) -> str: - return self.__repr__() def __hash__(self) -> int: return hash(Set[self.elementType]) @@ -205,7 +193,7 @@ def memoize(func): # 优化to_type函数 @memoize -def to_type( +def ToType( typen: type|Any|str, *, module_name: str|None=None @@ -261,9 +249,9 @@ def to_type( for module in sys.modules.values(): if type_final in module.__dict__: return module.__dict__[type_final] - return get_type_from_string(typen) - elif is_union(typen): - uTypes = get_union_types(typen) + return String2Type(typen) + elif IsUnion(typen): + uTypes = GetUnionTypes(typen) uTypes = [uType for uType in uTypes if uType is not type(None)] if len(uTypes) == 1: return uTypes[0] @@ -286,16 +274,16 @@ def to_type( else: return type(typen) -def try_to_type(typen:type|Any|str, *, module_name:str|None=None) -> type|List[type]|_SpecialIndictaor|None: +def TryToType(typen:type|Any|str, *, module_name:str|None=None) -> type|List[type]|_SpecialIndictaor|None: try: - return to_type(typen, module_name=module_name) + return ToType(typen, module_name=module_name) except Exception: return None -def is_union(type_hint: type | Any) -> bool: +def IsUnion(type_hint: type | Any) -> bool: return "__origin__" in dir(type_hint) and type_hint.__origin__ == Union -def get_union_types(type_hint: type | Any) -> List[type]: +def GetUnionTypes(type_hint: type | Any) -> List[type]: return [t for t in type_hint.__args__] class TypeVarIndictaor: @@ -306,7 +294,7 @@ class AnyVarIndicator: # 优化decay_type函数 @memoize -def decay_type( +def DecayType( type_hint: type|Any, *, module_name: str|None=None @@ -317,29 +305,29 @@ def decay_type( if GetInternalReflectionDebug(): print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}") - - result:type|List[type] = None + + result: type|List[type] = None # 处理字符串类型 if isinstance(type_hint, str): try: - result = to_type(type_hint, module_name=module_name) + result = ToType(type_hint, module_name=module_name) except TypeError: result = Any # 处理forward reference elif hasattr(type_hint, "__forward_arg__"): - result = to_type(type_hint.__forward_arg__, module_name=module_name) + result = ToType(type_hint.__forward_arg__, module_name=module_name) # 处理type类型 elif type_hint is type: result = type_hint # 处理union类型 - elif is_union(type_hint): - result = get_union_types(type_hint) + elif IsUnion(type_hint): + result = GetUnionTypes(type_hint) # 处理TypeVar elif isinstance(type_hint, TypeVar): result = TypeVarIndictaor # 处理泛型类型 - elif is_generic(type_hint): + elif IsGeneric(type_hint): result = get_origin(type_hint) else: raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>") @@ -348,7 +336,7 @@ def decay_type( print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}") return result -def is_just_defined_in_current_class(member_name:str, current_class:type) -> bool: +def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool: ''' 检查成员是否只在当前类中定义,而不是在父类中定义 ''' @@ -363,104 +351,11 @@ def is_just_defined_in_current_class(member_name:str, current_class:type) -> boo return False return True -class light_reflection(any_class): - def __init__(self, obj:object, type_str:str=None, *args, **kwargs): - if obj is not None: - self.obj = obj - elif type_str is not None: - self.obj = self.create_instance(type_str, args, kwargs) - - @override - def SymbolName(self): - return "light_reflection" - @override - def ToString(self): - return f"ToolReflection<{type(self.obj).__name__}>" - - def get_attributes(self): - """获取对象的所有属性和它们的值""" - return {attr: getattr(self.obj, attr) for attr in dir(self.obj) if not callable(getattr(self.obj, attr)) and not attr.startswith("__")} - - def get_methods(self): - """获取对象的所有方法""" - return {method: getattr(self.obj, method) for method in dir(self.obj) if callable(getattr(self.obj, method)) and not method.startswith("__")} - - def contains_attribute(self, attr_name): - """检查对象是否具有某个属性""" - return hasattr(self.obj, attr_name) - - def contains_method(self, method_name): - """检查对象是否具有某个方法""" - return hasattr(self.obj, method_name) and callable(getattr(self.obj, method_name)) - - def call_method(self, method_name, *args, **kwargs): - """调用对象的方法""" - if self.contains_method(method_name): - return getattr(self.obj, method_name)(*args, **kwargs) - else: - raise AttributeError(f"{self.obj.__class__.__name__} object has no method '{method_name}'") - - def set_attribute(self, attr_name, value): - """设置对象的属性值""" - if self.contains_attribute(attr_name): - setattr(self.obj, attr_name, value) - else: - raise AttributeError(f"{self.obj.__class__.__name__} object has no attribute '{attr_name}'") - def set(self, field:str, value): - self.set_attribute(field, value) - - def get_attribute(self, attr_name): - """获取对象的属性值""" - if self.contains_attribute(attr_name): - return getattr(self.obj, attr_name) - else: - raise AttributeError(f"{self.obj.__class__.__name__} object has no attribute '{attr_name}'") - def get(self, field:str): - return self.get_attribute(field) - - def create_instance(self, type_string:str, *args, **kwargs): - """根据类型字符串生成类型的实例""" - type_ = get_type_from_string(type_string) - return type_(*args, **kwargs) - - def create_instance_ex(self, type_string:str, params: Union[Dict[str,object], object]={}): - """根据类型字符串生成类型的实例""" - - typen = get_type_from_string(type_string) - if type_string in type_symbols: - return typen(params) - if params is None or len(params) == 0: - return typen() - - # 获取构造函数参数信息 - constructor_params = inspect.signature(typen.__init__).parameters - if len(constructor_params) == 0: - return typen() - - # 准备构造函数参数 - init_args = {'args':None, 'kwargs':None} - for param_name, param in constructor_params.items(): - if param_name == 'self': - continue - if param_name in params: - init_args[param_name] = params[param_name] - elif param.default is not param.empty: - init_args[param_name] = param.default - elif param_name == 'args' or param_name == 'kwargs': - continue - else: - raise TypeError(f"Cannot instantiate type '{type_string}' without required parameter '{param_name}'") - - return typen(**init_args) - -class BaseInfo(BaseModel, any_class): - def __init__(self, **kwargs): - BaseModel.__init__(self, **kwargs) - any_class.__init__(self) - - @virtual - def __str__(self) -> str: - return self.ToString() +class BaseInfo(BaseModel): + def SymbolName(self) -> str: + return "BaseInfo" + def ToString(self) -> str: + return self.SymbolName() class MemberInfo(BaseInfo): _MemberName: str = PrivateAttr(default="") @@ -479,7 +374,7 @@ class MemberInfo(BaseInfo): def MemberName(self) -> str: return self._MemberName @property - def ParentType(self) -> type: + def ParentType(self) -> Optional[type]: return self._ParentType @property def IsStatic(self) -> bool: @@ -516,9 +411,9 @@ class ValueInfo(BaseInfo): @property def IsUnion(self) -> bool: - return is_union(self._RealType) + return IsUnion(self._RealType) @property - def RealType(self): + def RealType(self) -> type|Any: return self._RealType @property def IsCollection(self) -> bool: @@ -609,7 +504,7 @@ class ValueInfo(BaseInfo): if valueType is type(None): return True if self.IsUnion: - return any(ValueInfo(uType).Verify(valueType) for uType in get_union_types(self.RealType)) + return any(ValueInfo(uType).Verify(valueType) for uType in GetUnionTypes(self.RealType)) elif self.RealType is Any: return True elif self.RealType is type(None): @@ -623,7 +518,7 @@ class ValueInfo(BaseInfo): def DecayToList(self) -> List[Self]: result:List[Self] = [] if self.IsUnion: - for uType in get_union_types(self.RealType): + for uType in GetUnionTypes(self.RealType): result.extend(ValueInfo(uType).DecayToList()) else: result.append(self) @@ -649,7 +544,7 @@ class ValueInfo(BaseInfo): module_name: Optional[str] = None, SelfType: type|Any|None = None, **kwargs - ) -> Self: + ) -> 'ValueInfo': if GetInternalReflectionDebug(): print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\ f"metaType={metaType}, SelfType={SelfType}") @@ -665,17 +560,17 @@ class ValueInfo(BaseInfo): else: return ValueInfo(metaType, **kwargs) elif isinstance(metaType, str): - type_ = try_to_type(metaType, module_name=module_name) + type_ = TryToType(metaType, module_name=module_name) if type_ is None: return ValueInfo(metaType, **kwargs) else: return ValueInfo(type_, **kwargs) - elif metaType is Self: + elif isinstance(metaType, Self)#metaType is Self: if SelfType is None: raise ReflectionException("SelfType is required when metaType is ") return ValueInfo.Create(SelfType, **kwargs) elif isinstance(metaType, TypeVar): - gargs = get_generic_args(metaType) + gargs = GetGenericArgs(metaType) if len(gargs) == 1: return ValueInfo(gargs[0], **kwargs) else: @@ -687,7 +582,7 @@ class ValueInfo(BaseInfo): elif oType is dict: return ValueInfo(dict, [get_args(metaType)[0], get_args(metaType)[1]]) elif oType is tuple: - return ValueInfo(tuple, to_list(get_args(metaType))) + return ValueInfo(tuple, list(get_args(metaType))) elif oType is set: return ValueInfo(set, [get_args(metaType)[0]]) return ValueInfo(metaType, **kwargs) @@ -721,7 +616,7 @@ class FieldInfo(MemberInfo): @property def IsUnion(self) -> bool: - return self._MetaType.IsUnion + return self.ValueType.IsUnion @property def FieldName(self) -> str: ''' @@ -729,19 +624,18 @@ class FieldInfo(MemberInfo): ''' return self.MemberName @property - def ValueType(self): + def ValueType(self) -> ValueInfo: return self._MetaType @property def FieldType(self): ''' 字段类型 ''' - return self._MetaType.RealType + return self.ValueType.RealType def Verify(self, valueType:type) -> bool: - return self._MetaType.Verify(valueType) + return self.ValueType.Verify(valueType) - @virtual def GetValue(self, obj:Any) -> Any: if self.IsStatic: return getattr(self.ParentType, self.MemberName) @@ -751,7 +645,7 @@ class FieldInfo(MemberInfo): f"{ConsoleFrontColor.RED} , parent type mismatch, expected {self.ParentType}, got {type(obj)}"\ f"{ConsoleFrontColor.RESET}") return getattr(obj, self.MemberName) - @virtual + def SetValue(self, obj:Any, value:Any) -> None: if self.IsStatic: if self.Verify(type(value)): @@ -934,7 +828,7 @@ class MethodInfo(MemberInfo): method: Callable, ctype: Optional[type] = None, module_name: Optional[str] = None - ) -> Self: + ) -> 'MethodInfo': ''' 创建MethodInfo对象 name: 方法名 @@ -999,7 +893,7 @@ class RefType(ValueInfo): _FieldInfos: List[FieldInfo] = PrivateAttr() _MethodInfos: List[MethodInfo] = PrivateAttr() _MemberNames: List[str] = PrivateAttr() - _BaseTypes: List[Self] = PrivateAttr(default=None) + _BaseTypes: List[Self] = PrivateAttr(default=[]) _initialized: bool = PrivateAttr(default=False) _BaseMemberNamesSet: Set[str] = PrivateAttr(default_factory=set) _member_cache: Dict[Tuple[str, RefTypeFlag], Optional[MemberInfo]] = PrivateAttr(default_factory=dict) @@ -1012,12 +906,12 @@ class RefType(ValueInfo): super().__init__(dict, generic_args=[metaType.keyType, metaType.valueType]) metaType = dict elif isinstance(metaType, TupleIndictaor): - super().__init__(tuple, generic_args=metaType.elementTypes) + super().__init__(tuple, generic_args=list(metaType.elementTypes)) metaType = tuple elif isinstance(metaType, SetIndictaor): super().__init__(set, generic_args=[metaType.elementType]) metaType = set - elif is_generic(metaType): + elif IsGeneric(metaType): raise NotImplementedError("Generic type is not supported") else: super().__init__(metaType) @@ -1128,7 +1022,7 @@ class RefType(ValueInfo): for name, annotation in annotations_dict.items(): if name not in self._BaseMemberNamesSet and name not in field_names and not name.startswith('__'): field_info = FieldInfo( - metaType=decay_type(annotation), + metaType=DecayType(annotation), name=name, ctype=metaType, is_static=False, @@ -1220,26 +1114,6 @@ class RefType(ValueInfo): else: raise ReflectionException(f"Method {name} not found") - def TryGetFieldValue[T](self, obj:object, lv:left_value_reference[T], name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> bool: - try: - lv.ref_value = self.GetFieldValue(obj, name, flags) - return True - except ReflectionException: - return False - def TrySetFieldValue[T](self, obj:object, name:str, value:T, flags:RefTypeFlag=RefTypeFlag.Default) -> bool: - try: - self.SetFieldValue(obj, name, value, flags) - return True - except ReflectionException: - return False - def TryInvokeMethod[T](self, obj:object, lv:left_value_reference[T], - name:str, flags:RefTypeFlag=RefTypeFlag.Default, *args, **kwargs) -> bool: - try: - lv.ref_value = self.InvokeMethod(obj, name, flags, *args, **kwargs) - return True - except ReflectionException: - return False - def CreateInstance(self, *args, **kwargs) -> object: return self.RealType(*args, **kwargs) @@ -1252,7 +1126,7 @@ class RefType(ValueInfo): @override def ToString(self) -> str: return f"RefType" - def print_str(self, verbose:bool=False, flags:RefTypeFlag=RefTypeFlag.Default) -> str: + def Print2Str(self, verbose:bool=False, flags:RefTypeFlag=RefTypeFlag.Default) -> str: fields: List[str] = [] methods: List[str] = [] for field in self.GetFields(flags): @@ -1264,8 +1138,7 @@ class RefType(ValueInfo): return f"RefType" - @sealed - def tree(self, indent:int=4) -> str: + def Print2Tree(self, indent:int=4) -> str: type_set: set = set() def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]: if currentType.IsPrimitive: @@ -1312,7 +1185,7 @@ class RefType(ValueInfo): return self.RealType == other.RealType # 添加新的优化方法,避免重复检查 - def _init_base_types_if_needed(self): + def _InitBaseTypesIfNeeded(self): """初始化基类类型,只在需要时执行""" if self._BaseTypes is None: self._BaseTypes = [] @@ -1327,7 +1200,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetFields(flag)) @@ -1336,7 +1209,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetAllBaseFields(self) -> List[FieldInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetAllFields()) @@ -1346,7 +1219,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetMethods(flag)) @@ -1355,7 +1228,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetAllBaseMethods(self) -> List[MethodInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetAllMethods()) @@ -1364,7 +1237,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetMembers(flag)) @@ -1373,7 +1246,7 @@ class RefType(ValueInfo): @functools.lru_cache(maxsize=128) def GetAllBaseMembers(self) -> List[MemberInfo]: if self._BaseTypes is None: - self._init_base_types_if_needed() + self._InitBaseTypesIfNeeded() result = [] for baseType in self._BaseTypes: result.extend(baseType.GetAllMembers()) @@ -1434,7 +1307,7 @@ RTypen[T] 是 T 类型的 RefType _Internal_TypeManager:Optional['TypeManager'] = None -class TypeManager(BaseModel, any_class): +class TypeManager(BaseModel): _RefTypes: Dict[type|_SpecialIndictaor, RefType] = PrivateAttr(default_factory=dict) _is_preheated: bool = PrivateAttr(default=False) _weak_refs: Dict[int, "weakref.ref[RefType]"] = PrivateAttr(default_factory=dict) # 使用真正的弱引用 @@ -1442,7 +1315,7 @@ class TypeManager(BaseModel, any_class): _string_to_type_cache: Dict[str, Any] = PrivateAttr(default_factory=dict) # 字符串到类型的缓存 @classmethod - def GetInstance(cls) -> Self: + def GetInstance(cls) -> 'TypeManager': global _Internal_TypeManager if _Internal_TypeManager is None: _Internal_TypeManager = cls() @@ -1457,7 +1330,7 @@ class TypeManager(BaseModel, any_class): # 常用的基础类型列表 common_types = [ int, float, str, bool, list, dict, tuple, set, - object, type, None.__class__, Exception, BaseModel, any_class + object, type, None.__class__, Exception, BaseModel ] # 预加载的类型和它们之间常见的关系,减少后续运行时计算 @@ -1498,12 +1371,12 @@ class TypeManager(BaseModel, any_class): # 尝试使用to_type函数 try: - return to_type(data, module_name=module_name) + return ToType(data, module_name=module_name) except Exception: pass # 尝试使用try_to_type函数作为回退 - metaType = try_to_type(data, module_name=module_name) + metaType = TryToType(data, module_name=module_name) if metaType is None or isinstance(metaType, list): metaType = data return metaType diff --git a/setup.py b/setup.py index 7b515d2..a1c89e8 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,8 @@ setup( "colorama", "pydantic", "python-docx", - "Pillow" + "Pillow", + "pydub" ], exclude_package_data={"": ["*.meta"]}, ) \ No newline at end of file