Compare commits

..

9 Commits

Author SHA1 Message Date
007db5a06b Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-14 19:29:03 +08:00
c11469a108 ToolFile+AbsPath 2025-10-14 19:26:05 +08:00
81209e85ee 增强ToolFile对斜杠的判断与相关行为 2025-10-14 17:09:04 +08:00
97c57f65df Update ToolFile 2025-10-13 17:01:21 +08:00
5b38b6239e Update ToolFile 2025-10-13 16:25:19 +08:00
f146d241eb 1.File新增逐行读取迭代2.Rename PrintColorful 2025-10-13 15:39:29 +08:00
7ff00a8ab9 放弃EP Visual 2025-09-30 14:49:39 +08:00
b02eafcb35 EP Interaction 2025-09-30 10:40:58 +08:00
ecaab13948 EP Interaction 2025-09-29 16:22:10 +08:00
11 changed files with 970 additions and 3173 deletions

View File

@@ -8,7 +8,7 @@ import datetime
try: try:
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
except: except:
print("colorama is not installed, using default colors") print("colorama is not installed")
class ConsoleFrontColor: class ConsoleFrontColor:
RED = "" RED = ""
GREEN = "" GREEN = ""
@@ -60,7 +60,7 @@ def GetInternalDebug() -> bool:
global INTERNAL_DEBUG global INTERNAL_DEBUG
return INTERNAL_DEBUG return INTERNAL_DEBUG
def print_colorful(color:str, *args, is_reset:bool=True, **kwargs): def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard(): with lock_guard():
if is_reset: if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs) print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
@@ -328,6 +328,12 @@ class PlatformIndicator:
CompanyName : str = "DefaultCompany" CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject" ProductName : str = "DefaultProject"
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod @staticmethod
def GetApplicationPath() -> str: def GetApplicationPath() -> str:
"""获取应用程序所在目录""" """获取应用程序所在目录"""

View File

@@ -180,14 +180,14 @@ class ESReader(BaseModel):
''' '''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.') #module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug(): #if GetInternalEasySaveDebug():
# print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ # PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\ # f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}") # f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name) #typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to) #return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label) typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\ f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}") f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen) return TypeManager.GetInstance().CreateOrGetRefType(typen)
@@ -235,7 +235,7 @@ class ESReader(BaseModel):
if rtype is None: if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}") raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}") f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型 # 处理值类型
@@ -278,7 +278,7 @@ class ESReader(BaseModel):
else: else:
rinstance = rtype.CreateInstance() rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype) fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields: for field in fields:
@@ -289,19 +289,19 @@ class ESReader(BaseModel):
if field.FieldType == list and field.ValueType.IsGeneric: if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric: elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric: elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric: elif field.FieldType == dict and field.ValueType.IsGeneric:
@@ -309,13 +309,13 @@ class ESReader(BaseModel):
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1]) DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
) )
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>") f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else: else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>") f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName])) field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))

View File

@@ -1,10 +1,8 @@
import os.path
from .Config import * from .Config import *
import json import json
import shutil import shutil
import pandas as pd
import os import os
import sys
import pickle
import zipfile import zipfile
import tarfile import tarfile
import base64 import base64
@@ -14,21 +12,6 @@ import datetime
import stat import stat
from typing import * from typing import *
from pathlib import Path from pathlib import Path
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
from .String import Bytes2String
def GetExtensionName(file:str): def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:] return os.path.splitext(file)[1][1:]
@@ -67,8 +50,10 @@ class PermissionError(FileOperationError):
"""权限操作异常""" """权限操作异常"""
pass pass
from pydantic import BaseModel, GetCoreSchemaHandler, Field try:
from pydantic_core import core_schema from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel): class ToolFile(BaseModel):
OriginFullPath:str OriginFullPath:str
@@ -78,7 +63,7 @@ class ToolFile(BaseModel):
filePath: Union[str, Self], filePath: Union[str, Self],
): ):
filePath = os.path.expandvars(str(filePath)) filePath = os.path.expandvars(str(filePath))
if filePath[1:].startswith(":/") or filePath[1:].startswith(":\\"): if ":" in filePath:
filePath = os.path.abspath(filePath) filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath) super().__init__(OriginFullPath=filePath)
def __del__(self): def __del__(self):
@@ -92,15 +77,21 @@ class ToolFile(BaseModel):
def __or__(self, other): def __or__(self, other):
if other is None: if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}\\") return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else: else:
# 不使用os.path.join因为os.path.join存在如下机制 # 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如: # 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/" # os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze" # 而我们需要的是 "E:/dev/analyze"
first = self.GetFullPath().replace('/','\\').strip('\\') separator = PlatformIndicator.GetFileSeparator()
second = str(other).replace('/','\\') separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
return ToolFile(f"{first}\\{second}") first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other): def __idiv__(self, other):
temp = self.__or__(other) temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath() self.OriginFullPath = temp.GetFullPath()
@@ -122,16 +113,19 @@ class ToolFile(BaseModel):
# 获取比较对象的路径 # 获取比较对象的路径
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other) other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath self_path = self.OriginFullPath
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
# 如果两个文件都存在,则直接比较路径 # 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True: if self.Exists() == True and other.Exists() == True:
return self_path.strip('\\/') == other_path.strip('\\/') return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同 # 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists(): elif self.Exists() != other.Exists():
return False return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串 # 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else: else:
return self_path.replace('/','\\') == other_path.replace('/','\\') return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self): def ToPath(self):
return Path(self.OriginFullPath) return Path(self.OriginFullPath)
@@ -181,7 +175,7 @@ class ToolFile(BaseModel):
if self.Exists() is False: if self.Exists() is False:
raise FileNotFoundError("file not found") raise FileNotFoundError("file not found")
newpath = str(newpath) newpath = str(newpath)
if '\\' in newpath or '/' in newpath: if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath) newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath) new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path) os.rename(self.OriginFullPath, new_current_path)
@@ -192,16 +186,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r', encoding=encoding) as f: with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs) json_data = json.load(f, **kwargs)
return json_data return json_data
def LoadAsCsv(self) -> pd.DataFrame: def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame: def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f) return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame: def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame: def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f) return pd.read_excel(f)
def LoadAsBinary(self) -> bytes: def LoadAsBinary(self) -> bytes:
@@ -211,18 +221,103 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return f.read() return f.read()
def LoadAsWav(self): def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath) return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self): def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath) return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile: def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath) return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject: def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath) return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any: def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText() return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel: def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson()) return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data): def SaveAsJson(self, json_data):
try: try:
from pydantic import BaseModel from pydantic import BaseModel
@@ -234,16 +329,40 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w', encoding='utf-8') as f: with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4) json.dump(json_data, f, indent=4)
return self return self
def SaveAsCsv(self, csv_data:pd.DataFrame): def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath) csv_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsXml(self, xml_data:pd.DataFrame): def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath) xml_data.to_xml(self.OriginFullPath)
return self return self
def SaveAsDataframe(self, dataframe_data:pd.DataFrame): def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath) dataframe_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsExcel(self, excel_data:pd.DataFrame): def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False) excel_data.to_excel(self.OriginFullPath, index=False)
return self return self
def SaveAsBinary(self, binary_data:bytes): def SaveAsBinary(self, binary_data:bytes):
@@ -254,13 +373,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w') as f: with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data) f.writelines(text_data)
return self return self
def SaveAsAudio(self, audio_data:AudioSegment): def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath)) audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self return self
def SaveAsImage(self, image_data:ImageFile.ImageFile): def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath) image_data.save(self.OriginFullPath)
return self return self
def SaveAsDocx(self, docx_data:DocumentObject): def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath) docx_data.save(self.OriginFullPath)
return self return self
def SaveAsUnknown(self, unknown_data:Any): def SaveAsUnknown(self, unknown_data:Any):
@@ -276,6 +414,8 @@ class ToolFile(BaseModel):
return os.path.getsize(self.OriginFullPath) return os.path.getsize(self.OriginFullPath)
def GetExtension(self): def GetExtension(self):
return GetExtensionName(self.OriginFullPath) return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str: def GetFullPath(self) -> str:
return self.OriginFullPath return self.OriginFullPath
def GetFilename(self, is_without_extension = False): def GetFilename(self, is_without_extension = False):
@@ -285,7 +425,7 @@ class ToolFile(BaseModel):
''' '''
if is_without_extension and '.' in self.OriginFullPath: if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)] return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1]) return GetBaseFilename(self.OriginFullPath[:-1])
else: else:
return GetBaseFilename(self.OriginFullPath) return GetBaseFilename(self.OriginFullPath)
@@ -297,7 +437,7 @@ class ToolFile(BaseModel):
return os.path.dirname(self.OriginFullPath) return os.path.dirname(self.OriginFullPath)
def IsDir(self): def IsDir(self):
if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True return True
else: else:
return os.path.isdir(self.OriginFullPath) return os.path.isdir(self.OriginFullPath)
@@ -646,8 +786,11 @@ class ToolFile(BaseModel):
ignore_directories: 是否忽略目录事件 ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写 case_sensitive: 是否区分大小写
""" """
from watchdog.observers import Observer try:
from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists(): if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}") raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
@@ -1008,32 +1151,4 @@ class ToolFile(BaseModel):
是否隐藏 是否隐藏
""" """
return self.get_permissions()['hidden'] return self.get_permissions()['hidden']
def split_elements(
file: Union[ToolFile, str],
*,
ratios: List[float] = [1,1],
pr: Optional[Callable[[ToolFile], bool]] = None,
shuffler: Optional[Callable[[List[ToolFile]], None]] = None,
output_dirs: Optional[List[ToolFile]] = None,
output_must_exist: bool = True,
output_callback: Optional[Callable[[ToolFile], None]] = None
) -> List[List[ToolFile]]:
result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(),
ratios=ratios,
pr=pr,
shuffler=shuffler)
if output_dirs is None:
return result
for i in range(min(len(output_dirs), len(result))):
output_dir: ToolFile = output_dirs[i]
if output_dir.IsDir() is False:
raise Exception("Outputs must be directory")
if output_must_exist:
output_dir.must_exists_as_new()
for file in result[i]:
current = output_dirs[i].MakeFileInside(file)
if output_callback:
output_callback(current)
return result

View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

View File

@@ -241,7 +241,7 @@ def ToType(
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None) type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1] type_final = type_components[-1]
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\ PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}") f"typen: {typen}, type_components: {type_components}")
if type_module is not None: if type_module is not None:
return sys.modules[type_module].__dict__[type_final] return sys.modules[type_module].__dict__[type_final]
@@ -304,7 +304,7 @@ def DecayType(
return type_hint return type_hint
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}") PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None result: type|List[type] = None
@@ -333,7 +333,7 @@ def DecayType(
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>") raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}") PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool: def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
@@ -456,7 +456,7 @@ class ValueInfo(BaseInfo):
super().__init__(**kwargs) super().__init__(**kwargs)
self._RealType = metaType self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0: if GetInternalReflectionDebug() and len(generic_args) > 0:
print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}") f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args self._GenericArgs = generic_args
if not isinstance(metaType, type): if not isinstance(metaType, type):
@@ -546,7 +546,7 @@ class ValueInfo(BaseInfo):
**kwargs **kwargs
) -> 'ValueInfo': ) -> 'ValueInfo':
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\ PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}") f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type): if isinstance(metaType, type):
if metaType is list: if metaType is list:
@@ -601,7 +601,7 @@ class FieldInfo(MemberInfo):
selfType: type|Any|None = None selfType: type|Any|None = None
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ") f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__( super().__init__(
name = name, name = name,
@@ -611,7 +611,7 @@ class FieldInfo(MemberInfo):
) )
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType) self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}") f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property @property
@@ -746,7 +746,7 @@ class MethodInfo(MemberInfo):
is_class_method: bool, is_class_method: bool,
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})") f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public) MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType) self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
@@ -1143,12 +1143,12 @@ class RefType(ValueInfo):
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]: def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive: if currentType.IsPrimitive:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}" return f"{currentType.RealType}"
elif currentType.RealType in type_set: elif currentType.RealType in type_set:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return { return {
"type": f"{currentType.RealType}", "type": f"{currentType.RealType}",
@@ -1156,13 +1156,13 @@ class RefType(ValueInfo):
} }
else: else:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType) type_set.add(currentType.RealType)
value = {} value = {}
fields = currentType.GetFields() fields = currentType.GetFields()
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}") PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields: for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)) value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return { return {
@@ -1429,7 +1429,7 @@ class TypeManager(BaseModel):
if data is None: if data is None:
raise ReflectionException("data is None") raise ReflectionException("data is None")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}") PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型 # 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache: if isinstance(data, str) and data in self._string_to_type_cache:
@@ -1454,7 +1454,7 @@ class TypeManager(BaseModel):
# 添加到弱引用缓存 # 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type) self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Get "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\ f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type return ref_type
@@ -1507,7 +1507,7 @@ class TypeManager(BaseModel):
try: try:
ref_type = RefType(metaType) ref_type = RefType(metaType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Create "\ PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\ f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type self._RefTypes[metaType] = ref_type

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,264 +0,0 @@
# Visual 模块
Visual模块提供了数据可视化和图像处理相关的功能包括数据图表、图像处理、词云等。
## 目录结构
- `Core.py`: 核心数据可视化功能
- `OpenCV.py`: OpenCV图像处理功能
- `WordCloud.py`: 词云生成功能
- `Manim.py`: 数学动画功能
## 功能特性
### 1. 数据可视化 (Core.py)
#### 1.1 基础图表
- 折线图
- 柱状图
- 散点图
- 直方图
- 饼图
- 箱线图
- 热力图
- 分类数据图
- 联合图
#### 1.2 数据处理
- 缺失值处理
- 重复值处理
- 数据标准化
- 数据归一化
### 2. 图像处理 (OpenCV.py)
#### 2.1 图像操作
- 图像加载
- 支持多种格式jpg, png, bmp等
- 支持从文件路径或URL加载
- 支持从内存缓冲区加载
- 图像保存
- 支持多种格式输出
- 支持质量参数设置
- 支持压缩选项
- 图像显示
- 支持窗口标题设置
- 支持窗口大小调整
- 支持键盘事件处理
- 图像转换
- RGB转灰度
- RGB转HSV
- RGB转LAB
- 支持自定义转换矩阵
- 图像缩放
- 支持多种插值方法
- 支持保持宽高比
- 支持指定目标尺寸
- 图像旋转
- 支持任意角度旋转
- 支持旋转中心点设置
- 支持旋转后尺寸调整
- 图像翻转
- 水平翻转
- 垂直翻转
- 对角线翻转
- 图像合并
- 支持多图像拼接
- 支持透明度混合
- 支持蒙版处理
#### 2.2 ImageObject类详解
ImageObject类提供了完整的图像处理功能
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 基本属性
width = image.width # 图像宽度
height = image.height # 图像高度
channels = image.channels # 通道数
dtype = image.dtype # 数据类型
# 图像处理
image.resize_image(800, 600) # 调整大小
image.convert_to_grayscale() # 转换为灰度图
image.filter_gaussian((5, 5), 1.5, 1.5) # 高斯滤波
image.rotate_image(45) # 旋转45度
image.flip_image(horizontal=True) # 水平翻转
# 图像增强
image.adjust_brightness(1.2) # 调整亮度
image.adjust_contrast(1.5) # 调整对比度
image.adjust_saturation(0.8) # 调整饱和度
image.equalize_histogram() # 直方图均衡化
# 边缘检测
image.detect_edges(threshold1=100, threshold2=200) # Canny边缘检测
image.detect_contours() # 轮廓检测
# 特征提取
keypoints = image.detect_keypoints() # 关键点检测
descriptors = image.compute_descriptors() # 描述子计算
# 图像保存
image.save_image("output.jpg", quality=95) # 保存图像
image.save_image("output.png", compression=9) # 保存PNG
# 图像显示
image.show_image("预览") # 显示图像
image.wait_key(0) # 等待按键
# 图像信息
print(image.get_info()) # 获取图像信息
print(image.get_histogram()) # 获取直方图
```
#### 2.3 图像增强
- 边缘检测
- 滤波处理
- 阈值处理
- 形态学操作
- 轮廓检测
- 特征匹配
#### 2.4 视频处理
- 视频读取
- 视频写入
- 摄像头控制
- 帧处理
### 3. 词云生成 (WordCloud.py)
#### 3.1 词云功能
- 词云创建
- 标题设置
- 渲染输出
- 样式定制
### 4. 数学动画 (Manim.py)
#### 4.1 动画功能
- 数学公式动画
- 几何图形动画
- 图表动画
- 场景管理
## 使用示例
### 1. 数据可视化示例
```python
from Convention.Visual import Core
# 创建数据可视化生成器
generator = Core.data_visual_generator("data.csv")
# 绘制折线图
generator.plot_line("x", "y", title="折线图示例")
# 绘制柱状图
generator.plot_bar("category", "value", title="柱状图示例")
# 绘制散点图
generator.plot_scatter("x", "y", title="散点图示例")
# 绘制饼图
generator.plot_pie("category", title="饼图示例")
```
### 2. 图像处理示例
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 图像处理
image.resize_image(800, 600)
image.convert_to_grayscale()
image.filter_gaussian((5, 5), 1.5, 1.5)
# 保存图像
image.save_image("output.jpg")
```
### 3. 词云生成示例
```python
from Convention.Visual import WordCloud
# 创建词云
wordcloud = WordCloud.make_word_cloud("词云", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
])
# 设置标题
WordCloud.set_title(wordcloud, "编程语言词云")
# 渲染输出
WordCloud.render_to(wordcloud, "wordcloud.html")
```
### 4. 视频处理示例
```python
from Convention.Visual import OpenCV
# 创建视频捕获对象
camera = OpenCV.light_cv_camera(0)
# 创建视频写入对象
writer = OpenCV.VideoWriterInstance(
"output.avi",
OpenCV.avi_with_Xvid_fourcc(),
30.0,
(640, 480)
)
# 录制视频
def stop_condition():
return OpenCV.is_current_key('q')
camera.recording(stop_condition, writer)
```
## 依赖项
- matplotlib: 数据可视化
- seaborn: 高级数据可视化
- opencv-python: 图像处理
- pyecharts: 词云生成
- manim: 数学动画
## 注意事项
1. 使用图像处理时注意内存占用
2. 视频处理时注意帧率设置
3. 词云生成时注意数据量
4. 动画制作时注意性能优化
## 性能优化
1. 使用图像处理时注意批量处理
2. 视频处理时使用合适的编码格式
3. 词云生成时控制词数
4. 动画制作时优化渲染设置
## 贡献指南
欢迎提交Issue和Pull Request来改进功能或添加新特性。

View File

@@ -1,66 +0,0 @@
from ..Internal import *
from pyecharts.charts import WordCloud
from pyecharts import options as opts
from pyecharts import types
#from ..File.Core import tool_file, UnWrapper as UnWrapper2Str
def make_word_cloud(
series_name: str,
data_pair: Sequence[Tuple[str, int]],
**kwargs,
):
wordcloud = WordCloud()
wordcloud.add(series_name, data_pair, **kwargs)
return wordcloud
def set_title(
wordcloud: WordCloud,
title: str
):
wordcloud.set_global_opts(
title_opts=opts.TitleOpts(title=title)
)
def render_to(
wordcloud: WordCloud,
file_name: Union[tool_file, str]
):
wordcloud.render(UnWrapper2Str(file_name))
class light_word_cloud(left_value_reference[WordCloud]):
def __init__(
self,
series_name: str,
data_pair: types.Sequence,
**kwargs,
):
super().__init__(make_word_cloud(series_name, data_pair, **kwargs))
def set_title(
self,
title: str
):
set_title(self.ref_value, title)
def render_to(
self,
file_name: Union[tool_file, str]
):
render_to(self.ref_value, file_name)
if __name__ == "__main__":
# 准备数据
wordcloud = make_word_cloud("", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
("Go", 60),
("Rust", 50),
("C#", 40),
("PHP", 30),
("Swift", 20),
("Kotlin", 10),
], word_size_range=[20, 100])
set_title(wordcloud, "cloud")
render_to(wordcloud, "wordcloud.html")

View File

@@ -1,9 +1,10 @@
import sys import sys
import os import os
from time import sleep
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.File import * from Convention.Runtime.File import *
first = ToolFile("E:/dev/") file = ToolFile("[Test]")|"temp"|None
second = ToolFile("/analyze/") print(file.MustExistsPath())
print(first|second)