BS 0.1.0 EasySave

This commit is contained in:
2025-07-10 15:08:20 +08:00
parent 39f8320c49
commit b259b499d4
12 changed files with 197 additions and 295 deletions

View File

@@ -0,0 +1,133 @@
from .Config import *
from .Reflection import *
from abs import ABC, abstractmethod
class ISignal(ABC):
pass
class IModel(ABC):
@absstractmethod
def Save(self) -> str:
pass
@absstractmethod
def Load(self, data:str) -> None:
pass
class IConvertable[T](ABC):
@absstractmethod
def ConvertTo() -> T:
pass
class IConvertModel[T](ABC, IConventable[T], IModel):
pass
class SingletonModel[T](IModel):
_InjectInstances:Dict[type,object] = {}
@staticmethod
def GetInstance(t:Typen[T]) -> T:
return _InjectInstances[t]
@staticmethod
def SetInstance(t:Typen[T], obj:T) -> None:
_InjectInstances[t] = obj
def __init__(self, t:Typen[T]) -> None:
self.typen: type = t
@override
def Save() -> str:
return GetInstance(self.typen).Save()
def DependenceModel(IConvertModel[bool]):
def __init__(self, queries:Sequence[IConvertModel[bool]]) -> None:
self.queries:list[IConventModel] = list(queries)
def ConvertTo(self) -> bool:
for query in self.queries:
if query.ConvertTo() == False:
return False
return True
def __iter__(self):
return self.queries
def Load(self, data:str):
raise NotImplementedError()
def Save(self) -> str:
return NotImplement
class Achitecture:
@staticmethod
def FormatType(t:type) -> str:
return f"{t.__module__}::{t.__name__}"
@staticmethod
def LoadFromFormat(data:str) -> type|None:
module,name = data.split("::")
return StringWithModule2Type(module,name)
@classmethod
def InternalReset(cls) -> None:
# Register System
#region Objects Registered
class TypeQuery(IConvertModel[bool]):
def __init__(self, queryType:type) -> None:
self._quertType = quertType
@override
def ConvertTo(self) -> bool:
return self._queryType in Architectrue.Childs
def Load(self, data:str) -> None:
raise NotImplementedError()
def Save(self) -> str:
raise NotImplementedError()
_RegisterHistory: Set[type] = set()
_UncompleteTargets: Dict[type,object] = {}
_Completer: Dict[type,Action] = {}
_Dependences: Dict[type,DependenceModel] = {}
_Childs: Dict[type,object] = {}
class Registering(IConvertModel[bool]):
def __init__(self,registerSlot:type) -> None:
self._registerSlot:type = registerSlot
@override
def ConvertTo(self) -> bool:
return self._registerSlot in Architecture.Childs
@override
def Load(self,data:str) -> None:
raise InvalidOperationError()
@override
def Save(self) -> str:
raise InalidOperationError()
@classmethod
def _InternalRegisteringComplete(cls) -> bool,Set[type]:
resultSet: Set[type] = set()
stats: bool = False
for dependence in cls._Dependences.keys:
if cls._Dependences[dependence].ConventTo():
resultSet.insert(dependence)
stats = True
return stats,resultSet
@classmethod
def _InternalRegisteringUpdate(cls, internalUpdateBuffer:Set[type]):
for complete in cls._InternalUpdateBuffer:
cls._Dependences.remove(complete)
# TODO
#endregion

View File

@@ -1,111 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/EasySave
---
完整的序列化和数据持久化系统,提供易用的数据存储和读取功能
## 核心特性
### 支持的数据格式
- **EasySave - JSON** 支持多态的json格式
- **Binary** 紧凑的二进制格式
## EasySave主类
### 基本保存操作
#### 泛型保存
- `Save<T>(string key, T value)` 保存到默认文件
- `Save<T>(string key, T value, string filePath)` 保存到指定文件
- `Save<T>(string key, T value, EasySaveSettings settings)` 使用自定义设置
#### 原始数据保存
- `SaveRaw(byte[] bytes)` / `SaveRaw(string str)` 保存原始数据
- `AppendRaw(byte[] bytes)` / `AppendRaw(string str)` 追加原始数据
### 基本加载操作
#### 泛型加载
- `Load<T>(string key)` 从默认文件加载
- `Load<T>(string key, string filePath)` 从指定文件加载
- `Load<T>(string key, T defaultValue)` 带默认值的安全加载
#### 原始数据加载
- `LoadRawBytes()` / `LoadRawString()` 加载原始数据
- `LoadString(string key, string defaultValue)` 加载字符串
#### 加载到现有对象
- `LoadInto<T>(string key, T obj)` 将数据加载到现有对象
- `LoadInto(string key, string filePath, object obj)` 从指定文件加载到对象
### 序列化操作
#### 内存序列化
- `Serialize<T>(T value, EasySaveSettings settings = null)` 序列化为字节数组
- `Deserialize<T>(byte[] bytes, EasySaveSettings settings = null)` 从字节数组反序列化
- `DeserializeInto<T>(byte[] bytes, T obj)` 反序列化到现有对象
### 加密和压缩
#### 加密操作
- `EncryptBytes(byte[] bytes, string password = null)` 加密字节数组
- `DecryptBytes(byte[] bytes, string password = null)` 解密字节数组
- `EncryptString(string str, string password = null)` 加密字符串
- `DecryptString(string str, string password = null)` 解密字符串
#### 压缩操作
- `CompressBytes(byte[] bytes)` 压缩字节数组
- `DecompressBytes(byte[] bytes)` 解压字节数组
- `CompressString(string str)` 压缩字符串
- `DecompressString(string str)` 解压字符串
### 备份系统
#### 备份操作
- `CreateBackup()` / `CreateBackup(string filePath)` 创建备份
- `RestoreBackup(string filePath)` 恢复备份
#### 时间戳
- `GetTimestamp()` / `GetTimestamp(string filePath)` 获取文件时间戳
### 缓存系统
#### 缓存操作
- `StoreCachedFile()` / `StoreCachedFile(string filePath)` 存储缓存文件
- `CacheFile()` / `CacheFile(string filePath)` 缓存文件
## EasySaveSettings配置
### 枚举类型定义
#### 存储位置
```csharp
public enum Location { File, InternalMS, Cache }
```
#### 目录类型
```csharp
public enum Directory { PersistentDataPath, DataPath }
```
#### 加密类型
```csharp
public enum EncryptionType { None, AES }
```
#### 压缩类型
```csharp
public enum CompressionType { None, Gzip }
```
#### 数据格式
```csharp
public enum Format { JSON }
```
#### 引用模式
```csharp
public enum ReferenceMode { ByRef, ByValue, ByRefAndValue }
```

View File

@@ -18,8 +18,8 @@ class EasySaveSetting(BaseModel):
# TODO: refChain: bool = Field(description="是否以保留引用的方式保存", default=True)
# 文件形式与参数
# TODO: encoding: str = Field(description="编码", default="utf-8")
isBackup: bool = Field(description="是否备份", default=True)
backup_suffix: str = Field(description="备份后缀", default=".backup")
isBackup: bool = Field(description="是否备份", default=True)
backupSuffix: str = Field(description="备份后缀", default=".backup")
# 序列化/反序列化时, 如果设置了忽略字段的谓词, 则被谓词选中的字段将不会工作
# 如果设置了选择字段的谓词, 则被选中的字段才会工作
ignorePr: Optional[Callable[[FieldInfo], bool]] = Field(description="忽略字段的谓词", default=None)
@@ -139,9 +139,9 @@ class ESWriter(BaseModel):
raise FileNotFoundError(f"文件路径不存在: {result_file.GetDir()}")
if result_file.Exists() and self.setting.isBackup:
if result_file.GetDir() is not None:
backup_file = ToolFile(result_file.GetDir()) | (result_file.GetFilename(True) + self.setting.backup_suffix)
backup_file = ToolFile(result_file.GetDir()) | (result_file.GetFilename(True) + self.setting.backupSuffix)
else:
backup_file = ToolFile(result_file.GetFilename(True) + self.setting.backup_suffix)
backup_file = ToolFile(result_file.GetFilename(True) + self.setting.backupSuffix)
result_file.Copy(backup_file)
try:
self.Serialize(result_file, TypeManager.GetInstance().CreateOrGetRefType(rinstance), rinstance)
@@ -192,7 +192,6 @@ class ESReader(BaseModel):
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen)
@sealed
def _DoJsonDeserialize(self, read_file:ToolFile, rtype:Optional[RefType] = None) -> Any:
'''
反序列化: json格式

View File

@@ -1,88 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/File
---
文件操作工具模块,提供跨平台的文件和目录操作功能
## ToolFile类
### 构造与基本信息
- `ToolFile(string path)` 从路径创建文件对象
- `ToString()` / `GetFullPath()` 获取完整路径
- `GetName(bool is_ignore_extension = false)` 获取文件名
- `GetExtension()` 获取文件扩展名
### 路径操作
- `ToolFile operator |(ToolFile left, string rightPath)` 路径连接操作符
- `Open(string path)` 打开指定路径
- `Open(FileMode mode)` 以指定模式打开文件
- `Close()` 关闭文件流
- `BackToParentDir()` 回到父目录
- `GetParentDir()` 获取父目录
### 存在性检查
- `Exists()` 检查文件或目录是否存在
- `implicit operator bool` 隐式布尔转换等同于Exists()
### 类型判断
- `IsDir()` 是否为目录
- `IsFile()` 是否为文件
- `IsFileEmpty()` 文件是否为空
### 文件加载
支持多种格式的文件读取:
#### JSON操作
- `LoadAsRawJson<T>()` 原始JSON反序列化
- `LoadAsJson<T>(string key = "data")` 使用EasySave加载JSON
#### 文本操作
- `LoadAsText()` 加载为文本字符串
#### 二进制操作
- `LoadAsBinary()` 加载为字节数组
### 文件保存
支持多种格式的文件写入:
#### JSON保存
- `SaveAsRawJson<T>(T data)` 原始JSON序列化保存
- `SaveAsJson<T>(T data, string key)` 使用EasySave保存JSON
#### 二进制保存
- `SaveAsBinary(byte[] data)` 保存字节数组
- `SaveDataAsBinary(string path, byte[] outdata, FileStream Stream = null)` 静态二进制保存
### 文件操作
- `Create()` 创建文件或目录
- `Rename(string newPath)` 重命名
- `Move(string path)` 移动文件
- `Copy(string path, out ToolFile copyTo)` 复制文件
- `Delete()` / `Remove()` 删除文件或目录
- `Refresh()` 刷新文件信息
### 路径管理
- `MustExistsPath()` 确保路径存在(自动创建)
- `TryCreateParentPath()` 尝试创建父路径
### 目录操作
- `DirIter()` 遍历目录获取字符串列表
- `DirToolFileIter()` 遍历目录获取ToolFile列表
- `DirCount()` 获取目录内项目数量
- `DirClear()` 清空目录内容
- `MakeFileInside(string source, bool isDeleteSource = false)` 在目录内创建文件
### 文件对话框(平台相关)
- `SelectMultipleFiles(string filter, string title)` 多文件选择对话框
- `SelectFile(string filter, string title)` 单文件选择对话框
- `SaveFile(string filter, string title)` 保存文件对话框
- `SelectFolder(string description)` 文件夹选择对话框
### 文件浏览
- `BrowseFile(params string[] extensions)` 浏览指定扩展名的文件
- `BrowseToolFile(params string[] extensions)` 浏览并返回ToolFile对象
### 时间戳
- `GetTimestamp()` 获取文件时间戳

View File

@@ -67,32 +67,17 @@ class PermissionError(FileOperationError):
"""权限操作异常"""
pass
from pydantic import BaseModel, GetCoreSchemaHandler
from pydantic import BaseModel, GetCoreSchemaHandler, Field
from pydantic_core import core_schema
class ToolFile(BaseModel):
OriginFullPath:str
@classmethod
def __get_pydantic_core_schema__(
cls,
_source_type: Any,
_handler: GetCoreSchemaHandler,
) -> core_schema.CoreSchema:
return core_schema.no_info_after_validator_function(
cls,
core_schema.any_schema(),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda instance: None
),
)
def __init__(
self,
filePath: Union[str, Self],
):
self.OriginFullPath = filePath if isinstance(filePath, str) else filePath.OriginFullPath
super().__init__(OriginFullPath=os.path.abspath(os.path.expandvars(str(filePath))))
def __del__(self):
pass
def __str__(self):
@@ -167,7 +152,7 @@ class ToolFile(BaseModel):
def Copy(self, targetPath:Optional[Union[Self, str]]=None):
if targetPath is None:
return ToolFile(self.OriginFullPath)
if self.Exists() is False:
if self.Exists() == False:
raise FileNotFoundError("file not found")
target_file = ToolFile(str(targetPath))
if target_file.IsDir():
@@ -195,69 +180,35 @@ class ToolFile(BaseModel):
return self
def LoadAsJson(self) -> Any:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
json_data = json.load(f)
return json_data
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
json_data = json.load(f)
return json_data
def LoadAsCsv(self) -> pd.DataFrame:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f)
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f)
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f)
def LoadAsBinary(self) -> bytes:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'rb') as f:
return f.read()
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'rb') as f:
return f.read()
def LoadAsText(self) -> str:
if self.Exists() is False or 'w' in self.OriginFullPath:
with open(self.OriginFullPath, 'r') as f:
return f.read()
else:
raise FileNotFoundError("file not found")
with open(self.OriginFullPath, 'r') as f:
return f.read()
def LoadAsWav(self):
if self.Exists() is False or 'w' in self.OriginFullPath:
return AudioSegment.from_wav(self.OriginFullPath)
else:
raise FileNotFoundError("file not found")
return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self):
if self.Exists() is False or 'w' in self.OriginFullPath:
return AudioSegment.from_file(self.OriginFullPath)
else:
raise FileNotFoundError("file not found")
return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile:
if self.Exists() is False or 'w' in self.OriginFullPath:
return Image.open(self.OriginFullPath)
else:
raise FileNotFoundError("file not found")
return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject:
if self.Exists() is False or 'w' in self.OriginFullPath:
return Document(self.OriginFullPath)
else:
raise FileNotFoundError("file not found")
return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel:

View File

@@ -1,7 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/Math
---
数学工具模块,提供常用数学计算功能

View File

@@ -1,7 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/Plugins
---
插件系统模块,提供扩展机制和平台特定功能

View File

@@ -87,7 +87,7 @@ def String2Type(type_string:str) -> type:
return result
raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>")
@functools.lru_cache(maxsize=256)
@functools.lru_cache(maxsize=1024)
def StringWithModel2Type(type_string:str, module_name:str) -> type|None:
'''
根据字符串生成类型,带模块名参数,使用缓存
@@ -565,7 +565,7 @@ class ValueInfo(BaseInfo):
return ValueInfo(metaType, **kwargs)
else:
return ValueInfo(type_, **kwargs)
elif isinstance(metaType, Self)#metaType is Self:
elif isinstance(metaType, Self):#metaType is Self:
if SelfType is None:
raise ReflectionException("SelfType is required when metaType is <Self>")
return ValueInfo.Create(SelfType, **kwargs)

View File

@@ -1,5 +1,5 @@
from pathlib import Path
from Config import *
from .Config import *
import re
from pathlib import Path
import xml.etree.ElementTree as ET
@@ -47,13 +47,13 @@ def Bytes2String(lines:List[bytes], encoding='utf-8') -> str:
return "".join(Bytes2Strings(lines, encoding))
def word_segmentation(
sentence: Union[str, light_str, Any],
sentence,
cut_all: bool = False,
HMM: bool = True,
use_paddle: bool = False
) -> Sequence[Optional[Union[Any, str]]]:
try:
import jieba
return jieba.dt.cut(UnWrapper(sentence), cut_all=cut_all, HMM=HMM, use_paddle=use_paddle)
return jieba.dt.cut(str(sentence), cut_all=cut_all, HMM=HMM, use_paddle=use_paddle)
except ImportError:
raise ValueError("jieba is not install")

20
[Test]/test.json Normal file
View File

@@ -0,0 +1,20 @@
{
"easy": {
"__type": "__main__.test_log, Global",
"value": {
"__type": "__main__.test_log, Global",
"model_computed_fields": {
"__type": "typing.Any, Global"
},
"model_extra": null,
"model_fields": {
"__type": "typing.Any, Global"
},
"model_fields_set": {
"__type": "typing.Any, Global"
},
"test_field": 1,
"test_field_2": "test"
}
}
}

View File

@@ -3,9 +3,18 @@ import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.Config import *
from Convention.Runtime.EasySave import *
class test_log(BaseModel):
test_field:int
test_field_2:str
def run():
print_colorful(ConsoleFrontColor.RED,"test")
SetInternalDebug(True)
SetInternalReflectionDebug(True)
SetInternalEasySaveDebug(True)
test = test_log(test_field=1,test_field_2="test")
EasySave.Write(test,"test.json")
if __name__ == "__main__":
run()

3
[Test]/test0.py Normal file
View File

@@ -0,0 +1,3 @@
import math
import r
print(re.findall(r"\d+[.\d]?", "xxxxx$19.99"))