Compare commits

..

19 Commits

Author SHA1 Message Date
007db5a06b Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-14 19:29:03 +08:00
c11469a108 ToolFile+AbsPath 2025-10-14 19:26:05 +08:00
81209e85ee 增强ToolFile对斜杠的判断与相关行为 2025-10-14 17:09:04 +08:00
97c57f65df Update ToolFile 2025-10-13 17:01:21 +08:00
5b38b6239e Update ToolFile 2025-10-13 16:25:19 +08:00
f146d241eb 1.File新增逐行读取迭代2.Rename PrintColorful 2025-10-13 15:39:29 +08:00
7ff00a8ab9 放弃EP Visual 2025-09-30 14:49:39 +08:00
b02eafcb35 EP Interaction 2025-09-30 10:40:58 +08:00
ecaab13948 EP Interaction 2025-09-29 16:22:10 +08:00
45a6689db8 GlobalConfig新增find字段 2025-09-26 10:34:34 +08:00
3cb7b11756 Try Web.py 2025-09-25 14:26:32 +08:00
4010d9dd8c Update Architecture.py 2025-09-25 11:52:26 +08:00
6331c8e025 忽略.vscode/ 2025-09-22 10:05:03 +08:00
ninemine
8365867823 Delete .vscode directory 2025-09-22 10:00:01 +08:00
494bf300be 修复一些bug 2025-09-20 03:39:51 +08:00
f1197d203b Update README.md 2025-09-09 09:55:22 +08:00
ninemine
f3f3cc8c54 Merge pull request #1 from NINEMINEsigma/EP-Asynchrony
EP Asynchrony
2025-07-25 10:54:01 +08:00
2bb6f924df EP Asynchrony 2025-07-25 10:48:07 +08:00
4d0f24fd0c EP Asynchrony 非因果型异步架构 2025-07-24 11:45:44 +08:00
20 changed files with 2247 additions and 3344 deletions

5
.gitignore vendored
View File

@@ -178,4 +178,7 @@ cython_debug/
# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
# refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore
.cursorindexingignore
# IDE
.vscode/

View File

@@ -1,3 +0,0 @@
{
"python.languageServer": "None"
}

View File

@@ -5,28 +5,38 @@ from abc import ABC, abstractmethod
class ISignal(ABC):
pass
class IModel(ABC):
pass
class IDataModel(ABC, IModel):
@abstractmethod
def Save(self) -> str:
pass
@abstractmethod
def Load(self, data:str) -> None:
pass
class IConvertable[T](ABC):
@abstractmethod
def ConvertTo(self) -> T:
pass
class IConvertModel[T](IConvertable[T], IModel):
pass
class SingletonModel[T](IModel):
_InjectInstances:Dict[type,Any] = {}
@staticmethod
def GetInstance(t:Typen[T]) -> T:
return SingletonModel._InjectInstances[t]
@staticmethod
def SetInstance(t:Typen[T], obj:T) -> None:
SingletonModel._InjectInstances[t] = obj
@@ -34,9 +44,6 @@ class SingletonModel[T](IModel):
def __init__(self, t:Typen[T]) -> None:
self.typen: type = t
@override
def Save(self) -> str:
return SingletonModel.GetInstance(self.typen).Save()
class DependenceModel(IConvertModel[bool]):
def __init__(self, queries:Sequence[IConvertModel[bool]]) -> None:
@@ -52,14 +59,10 @@ class DependenceModel(IConvertModel[bool]):
def __iter__(self):
return iter(self.queries)
def Load(self, data:str):
raise NotImplementedError()
def Save(self) -> str:
raise NotImplementedError()
SignalListener = Callable[[ISignal], None]
class Architecture:
@staticmethod
def FormatType(t:type) -> str:
@@ -78,14 +81,11 @@ class Architecture:
@classmethod
def InternalReset(cls) -> None:
# Register System
cls._RegisterHistory.clear()
cls._UncompleteTargets.clear()
cls._Completer.clear()
cls._Dependences.clear()
cls._Childs.clear()
# Event Listener
cls._RegisteredObjects.clear()
cls._RegisteringRuntime.clear()
# Signal Listener
cls._SignalListener.clear()
# Linear Chain for Dependence
# Timeline/Chain
cls._TimelineQueues.clear()
cls._TimelineContentID = 0
@@ -97,136 +97,79 @@ class Architecture:
@override
def ConvertTo(self) -> bool:
return self._queryType in Architecture._Childs
def Load(self, data:str) -> None:
raise NotImplementedError()
def Save(self) -> str:
raise NotImplementedError()
return self._queryType in Architecture._RegisteredObjects
_RegisterHistory: Set[type] = set()
_UncompleteTargets: Dict[type,Any] = {}
_Completer: Dict[type,Action] = {}
_Dependences: Dict[type,DependenceModel] = {}
_Childs: Dict[type,Any] = {}
class Registering(IConvertModel[bool]):
def __init__(self,registerSlot:type) -> None:
self._registerSlot:type = registerSlot
def __init__(self, registerSlot:type, target:Any, dependences:DependenceModel, action:Action) -> None:
self.registerSlot = registerSlot
self.target = target
self.dependences = dependences
self.action = action
@override
def ConvertTo(self) -> bool:
return self._registerSlot in Architecture._Childs
return self.dependences.ConvertTo()
@override
def Load(self,data:str) -> None:
raise InvalidOperationError(f"Cannot use {self.__class__.__name__} to load type")
@override
def Save(self) -> str:
return f"{Architecture.FormatType(self._registerSlot)}[{self.ConvertTo()}]"
_RegisteringRuntime: Dict[type, Registering] = {}
_RegisteredObjects: Dict[type, Any] = {}
@classmethod
def _InternalRegisteringComplete(cls) -> tuple[bool,Set[type]]:
resultSet: Set[type] = set()
stats: bool = False
for dependence in cls._Dependences.keys():
if cls._Dependences[dependence].ConvertTo():
resultSet.add(dependence)
stats = True
return stats,resultSet
def _InternalRegisteringComplete(cls) -> None:
CompletedSet: Set[Architecture.Registering] = set()
for dependence in cls._RegisteringRuntime.keys():
if cls._RegisteringRuntime[dependence].dependences.ConvertTo():
CompletedSet.add(cls._RegisteringRuntime[dependence])
for complete in CompletedSet:
del cls._RegisteringRuntime[complete.registerSlot]
complete.action()
cls._RegisteredObjects[complete.registerSlot] = complete.target
if len(CompletedSet) > 0:
cls._InternalRegisteringComplete()
@classmethod
def _InternalRegisteringUpdate(cls, internalUpdateBuffer:Set[type]):
for complete in internalUpdateBuffer:
cls._Dependences.pop(complete, None)
for complete in internalUpdateBuffer:
cls._Completer[complete]()
cls._Completer.pop(complete, None)
for complete in internalUpdateBuffer:
cls._Childs[complete] = cls._UncompleteTargets[complete]
cls._UncompleteTargets.pop(complete, None)
@classmethod
def Register(cls, slot:type, target:Any, completer:Action, *dependences:type) -> 'Architecture.Registering':
if slot in cls._RegisterHistory:
def Register(cls, slot:type, target:Any, action:Action, *dependences:type) -> DependenceModel:
if slot in cls._RegisteringRuntime:
raise InvalidOperationError("Illegal duplicate registrations")
cls._RegisterHistory.add(slot)
cls._Completer[slot] = completer
cls._UncompleteTargets[slot] = target
# 过滤掉自身依赖
filtered_deps = [dep for dep in dependences if dep != slot]
type_queries = [cls.TypeQuery(dep) for dep in filtered_deps]
cls._Dependences[slot] = DependenceModel(type_queries)
while True:
has_complete, buffer = cls._InternalRegisteringComplete()
if not has_complete:
break
cls._InternalRegisteringUpdate(buffer)
return cls.Registering(slot)
@classmethod
def RegisterGeneric[T](cls, target:T, completer:Action, *dependences:type) -> 'Architecture.Registering':
return cls.Register(type(target), target, completer, *dependences)
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
cls._InternalRegisteringComplete()
return cls._RegisteringRuntime[slot].dependences
@classmethod
def Contains(cls, type_:type) -> bool:
return type_ in cls._Childs
@classmethod
def ContainsGeneric[T](cls) -> bool:
return cls.Contains(type(T))
@classmethod
def InternalGet(cls, type_:type) -> Any:
return cls._Childs[type_]
return type_ in cls._RegisteredObjects
@classmethod
def Get(cls, type_:type) -> Any:
return cls.InternalGet(type_)
return cls._RegisteredObjects[type_]
@classmethod
def GetGeneric[T](cls) -> T:
return cls.Get(type(T))
def Unregister(cls, slot:type) -> bool:
if slot in cls._RegisteredObjects:
del cls._RegisteredObjects[slot]
return True
if slot in cls._RegisteringRuntime:
del cls._RegisteringRuntime[slot]
return True
return False
#endregion
#region Signal & Update
_SignalListener: Dict[type, Set[SignalListener]] = {}
class Listening:
def __init__(self, action:SignalListener, type_:type):
self._action = action
self._type = type_
def StopListening(self):
if self._type in Architecture._SignalListener:
Architecture._SignalListener[self._type].discard(self._action)
_SignalListener: Dict[type, List[SignalListener]] = {}
@classmethod
def AddListenerGeneric[Signal](cls, slot:type, listener:SignalListener) -> 'Architecture.Listening':
def AddListener(cls, slot:type, listener:SignalListener) -> None:
if slot not in cls._SignalListener:
cls._SignalListener[slot] = set()
cls._SignalListener[slot] = []
def action(signal:ISignal):
if isinstance(signal, slot):
listener(signal)
result = cls.Listening(action, slot)
cls._SignalListener[slot].add(action)
return result
cls._SignalListener[slot].append(listener)
@classmethod
def SendMessage(cls, slot:type, signal:ISignal):
if slot in cls._SignalListener:
for action in cls._SignalListener[slot]:
action(signal)
for listener in cls._SignalListener[slot]:
listener(signal)
#endregion
@@ -285,6 +228,3 @@ class Architecture:
#endregion

View File

@@ -0,0 +1,353 @@
from .Config import *
from .Reflection import *
from collections import defaultdict
import asyncio
import threading
from typing import Optional
from pydantic import BaseModel
from abc import ABC, abstractmethod
class AsyncContextDetector:
"""异步上下文检测工具类"""
@staticmethod
def is_in_async_context() -> bool:
"""检查是否在异步上下文中运行"""
try:
asyncio.current_task()
return True
except RuntimeError:
return False
@staticmethod
def get_current_loop() -> Optional[asyncio.AbstractEventLoop]:
"""获取当前事件循环如果没有则返回None"""
try:
return asyncio.get_running_loop()
except RuntimeError:
return None
@staticmethod
def ensure_async_context_safe(operation_name: str) -> None:
"""确保在异步上下文中执行是安全的"""
if AsyncContextDetector.is_in_async_context():
raise RuntimeError(
f"Cannot perform '{operation_name}' from within an async context. "
f"Use await or async methods instead."
)
class AsyncFieldAccessor:
"""异步字段访问器,封装字段访问逻辑"""
def __init__(
self,
async_fields: Dict[str, 'AsynchronyExpression'],
origin_fields: Dict[str, FieldInfo]
) -> None:
self._async_fields = async_fields
self._origin_fields = origin_fields
async def get_field_value_async(self, field_name: str):
"""异步获取字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return await self._async_fields[field_name].get_value()
def get_field_value_sync(self, field_name: str):
"""同步获取字段值(仅在非异步上下文中使用)"""
AsyncContextDetector.ensure_async_context_safe(f"sync access to field '{field_name}'")
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
async_expr = self._async_fields[field_name]
if not async_expr.is_initialize and async_expr.timeout > 0:
# 需要等待但在同步上下文中使用run_async
return run_async(async_expr.get_value())
elif not async_expr.is_initialize:
raise RuntimeError(f"Field '{field_name}' is not initialized and has no timeout")
else:
return run_async(async_expr.get_value())
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return self._async_fields[field_name].is_initialize
def set_field_value(self, field_name: str, value: Any) -> None:
"""设置字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
self._async_fields[field_name].set_value(value)
class AsynchronyUninitialized:
"""表示未初始化状态的单例类"""
__instance__ = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls.__instance__ is None:
with cls._lock:
if cls.__instance__ is None:
cls.__instance__ = super().__new__(cls)
return cls.__instance__
def __repr__(self):
return "uninitialized"
def __str__(self):
return "None"
class AsynchronyExpression:
def __init__(
self,
field: FieldInfo,
value: Any = AsynchronyUninitialized(),
*,
time_wait: float = 0.1,
timeout: float = 0,
callback: Optional[Action] = None,
):
'''
参数:
field: 字段
value: 初始化, 默认为AsynchronyUninitialized, 即无初始化
time_wait: 等待时间, 默认为0.1秒
timeout: 超时时间, 默认为0秒
callback: 回调函数, 默认为None, 当状态为无初始化时get_value会调用callback
'''
self.field = field
self._value = value
self.callback = callback
self.is_initialize = not isinstance(value, AsynchronyUninitialized)
self.time_wait = time_wait
self.timeout = timeout
def get_value_sync(self):
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.is_initialize:
return self._value
else:
raise RuntimeError(f"Field {self.field.FieldName} is not initialized")
async def get_value(self):
"""异步获取字段值,改进的超时机制"""
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.timeout > 0:
try:
# 使用 asyncio.wait_for 提供更精确的超时控制
async def wait_for_initialization():
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
return await asyncio.wait_for(wait_for_initialization(), timeout=self.timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Timeout waiting for uninitialized field {self.field.FieldName}")
else:
# 无超时,一直等待
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
def set_value(self, value: Any) -> None:
"""设置字段值"""
if isinstance(value, AsynchronyUninitialized):
self.set_uninitialized()
elif self.field.Verify(type(value)):
self._value = value
self.is_initialize = True
else:
raise ValueError(f"Value {value} is not valid for field {self.field.FieldName}")
def SetUninitialized(self) -> None:
"""设置为未初始化状态(保持兼容性的旧方法名)"""
self.set_uninitialized()
def set_uninitialized(self) -> None:
"""设置为未初始化状态"""
if self.is_initialize:
del self._value
self._value = AsynchronyUninitialized()
self.is_initialize = False
class Asynchronous(ABC):
__Asynchronous_Origin_Fields__: Dict[Type, Dict[str, FieldInfo]] = defaultdict(dict)
_fields_lock = threading.Lock()
def _GetAsynchronousOriginFields(self) -> Dict[str, FieldInfo]:
return Asynchronous.__Asynchronous_Origin_Fields__[type(self)]
def __init__(self, **kwargs: Dict[str, dict]):
super().__init__()
self.__Asynchronous_Fields__: Dict[str, AsynchronyExpression] = {}
# 使用线程锁保护类变量访问
with Asynchronous._fields_lock:
origin_fields = self._GetAsynchronousOriginFields()
for field_info in TypeManager.GetInstance().CreateOrGetRefTypeFromType(type(self)).GetAllFields():
if field_info.FieldName == "__Asynchronous_Origin_Fields__":
continue
origin_fields[field_info.FieldName] = field_info
self.__Asynchronous_Fields__[field_info.FieldName] = AsynchronyExpression(
field_info, **kwargs.get(field_info.FieldName, {})
)
# 创建字段访问器以提升性能
self._field_accessor = AsyncFieldAccessor(self.__Asynchronous_Fields__, origin_fields)
def __getattribute__(self, name: str) -> Any:
# 快速路径:非异步字段直接返回
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
return super().__getattribute__(name)
# 一次性获取所需属性,避免重复调用
try:
field_accessor:AsyncFieldAccessor = super().__getattribute__("_field_accessor")
origin_fields:Dict[str, FieldInfo] = super().__getattribute__("_GetAsynchronousOriginFields")()
except AttributeError:
# 对象可能尚未完全初始化
return super().__getattribute__(name)
if name in origin_fields:
# 这是一个异步字段
if AsyncContextDetector.is_in_async_context():
# 在异步上下文中,提供友好的错误提示
async_fields:Dict[str, AsynchronyExpression] = super().__getattribute__("__Asynchronous_Fields__")
async_expr = async_fields[name]
if not async_expr.is_initialize:
timeout_info = f" with {async_expr.timeout}s timeout" if async_expr.timeout > 0 else ""
raise RuntimeError(
f"Field '{name}' is not initialized{timeout_info}. "
)
else:
# 字段已初始化,直接返回值
return async_expr.get_value_sync()
else:
# 在同步上下文中,使用字段访问器
try:
return field_accessor.get_field_value_sync(name)
except RuntimeError as e:
if "Cannot perform" in str(e):
# 重新包装错误信息,提供更友好的提示
raise RuntimeError(
f"Cannot access async field '{name}' from sync context when it requires initialization. "
f"Use async context or ensure field is pre-initialized."
) from e
else:
raise
return super().__getattribute__(name)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__setattr__(name, value)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
field_accessor.set_field_value(name, value)
return
except AttributeError:
# 不是异步字段
pass
super().__setattr__(name, value)
def __delattr__(self, name: str) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__delattr__(name)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
origin_fields = super().__getattribute__("_GetAsynchronousOriginFields")()
if name in origin_fields:
async_fields = super().__getattribute__("__Asynchronous_Fields__")
async_fields[name].set_uninitialized()
return
except AttributeError:
# 不是异步字段
pass
super().__delattr__(name)
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
return self._field_accessor.is_field_initialized(field_name)
def run_until_complete(coro: Coroutine) -> Any:
"""Gets an existing event loop to run the coroutine.
If there is no existing event loop, creates a new one.
"""
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.run_until_complete(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async_coroutine(coro: Coroutine) -> Any:
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.create_task(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async(coro: Coroutine):
"""安全地运行异步协程,避免事件循环死锁"""
# 使用统一的异步上下文检测
AsyncContextDetector.ensure_async_context_safe("run_async")
# 尝试获取当前事件循环
current_loop = AsyncContextDetector.get_current_loop()
if current_loop is not None and not current_loop.is_running():
# 有事件循环但未运行,直接使用
return current_loop.run_until_complete(coro)
elif current_loop is None:
# 没有事件循环,创建新的
try:
return asyncio.run(coro)
except RuntimeError as e:
raise RuntimeError(
"Failed to run async coroutine. "
"Please ensure proper async environment or use nest_asyncio.apply() for nested loops."
) from e
else:
# 事件循环正在运行这种情况应该被AsyncContextDetector捕获
raise RuntimeError(
"Unexpected state: running event loop detected but context check passed. "
"This should not happen."
)

View File

@@ -5,10 +5,35 @@ import sys
import threading
import traceback
import datetime
import platform
import time
import os
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
try:
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
except:
print("colorama is not installed")
class ConsoleFrontColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleBackgroundColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleStyle:
RESET = ""
BOLD = ""
DIM = ""
UNDERLINE = ""
REVERSE = ""
HIDDEN = ""
class NotImplementedError(Exception):
def __init__(self, message:Optional[str]=None) -> None:
@@ -35,7 +60,7 @@ def GetInternalDebug() -> bool:
global INTERNAL_DEBUG
return INTERNAL_DEBUG
def print_colorful(color:str, *args, is_reset:bool=True, **kwargs):
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
@@ -303,6 +328,12 @@ class PlatformIndicator:
CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject"
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod
def GetApplicationPath() -> str:
"""获取应用程序所在目录"""
@@ -350,3 +381,25 @@ class DescriptiveIndicator[T]:
self.descripion : str = description
self.value : T = value
class Switch:
def __init__(self, value, isThougth = False) -> None:
self.value = value
self.isThougth = False
self.caseStats = False
self.result = None
def Case(self, caseValue, callback:Callable[[], Any]) -> 'Switch':
if self.caseStats and self.isThougth:
self.result = callback()
elif caseValue == self.value:
self.caseStats = True
self.result = callback()
return self
def Default(self, callback:Callable[[], Any]) -> Any:
if self.caseStats and self.isThougth:
self.result = callback()
elif self.caseStats == False:
self.caseStats = True
self.result = callback()
return self.result

View File

@@ -180,14 +180,14 @@ class ESReader(BaseModel):
'''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug():
# print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen)
@@ -235,7 +235,7 @@ class ESReader(BaseModel):
if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型
@@ -278,7 +278,7 @@ class ESReader(BaseModel):
else:
rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields:
@@ -289,19 +289,19 @@ class ESReader(BaseModel):
if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric:
@@ -309,13 +309,13 @@ class ESReader(BaseModel):
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))

View File

@@ -1,10 +1,8 @@
import os.path
from .Config import *
import json
import shutil
import pandas as pd
import os
import sys
import pickle
import zipfile
import tarfile
import base64
@@ -14,21 +12,6 @@ import datetime
import stat
from typing import *
from pathlib import Path
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
from .String import Bytes2String
def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:]
@@ -67,8 +50,10 @@ class PermissionError(FileOperationError):
"""权限操作异常"""
pass
from pydantic import BaseModel, GetCoreSchemaHandler, Field
from pydantic_core import core_schema
try:
from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel):
OriginFullPath:str
@@ -77,7 +62,10 @@ class ToolFile(BaseModel):
self,
filePath: Union[str, Self],
):
super().__init__(OriginFullPath=os.path.abspath(os.path.expandvars(str(filePath))))
filePath = os.path.expandvars(str(filePath))
if ":" in filePath:
filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath)
def __del__(self):
pass
def __str__(self):
@@ -89,9 +77,21 @@ class ToolFile(BaseModel):
def __or__(self, other):
if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else self.GetFullPath()+"\\")
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else:
return ToolFile(os.path.join(self.GetFullPath(), str(other)))
# 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze"
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other):
temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath()
@@ -109,20 +109,23 @@ class ToolFile(BaseModel):
"""
if other is None:
return False
# 获取比较对象的路径
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath
# 标准化路径,移除末尾的斜线
if self_path.endswith('/') or self_path.endswith('\\'):
self_path = self_path[:-1]
if other_path.endswith('/') or other_path.endswith('\\'):
other_path = other_path[:-1]
# 使用系统的路径规范化函数进行比较
return os.path.normpath(self_path) == os.path.normpath(other_path)
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
# 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True:
return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists():
return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else:
return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self):
return Path(self.OriginFullPath)
@@ -172,7 +175,7 @@ class ToolFile(BaseModel):
if self.Exists() is False:
raise FileNotFoundError("file not found")
newpath = str(newpath)
if '\\' in newpath or '/' in newpath:
if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path)
@@ -183,16 +186,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs)
return json_data
def LoadAsCsv(self) -> pd.DataFrame:
def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame:
def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame:
def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame:
def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f)
def LoadAsBinary(self) -> bytes:
@@ -202,18 +221,103 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r') as f:
return f.read()
def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile:
def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject:
def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel:
def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data):
try:
from pydantic import BaseModel
@@ -225,16 +329,40 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4)
return self
def SaveAsCsv(self, csv_data:pd.DataFrame):
def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath)
return self
def SaveAsXml(self, xml_data:pd.DataFrame):
def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath)
return self
def SaveAsDataframe(self, dataframe_data:pd.DataFrame):
def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath)
return self
def SaveAsExcel(self, excel_data:pd.DataFrame):
def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False)
return self
def SaveAsBinary(self, binary_data:bytes):
@@ -245,13 +373,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data)
return self
def SaveAsAudio(self, audio_data:AudioSegment):
def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self
def SaveAsImage(self, image_data:ImageFile.ImageFile):
def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath)
return self
def SaveAsDocx(self, docx_data:DocumentObject):
def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath)
return self
def SaveAsUnknown(self, unknown_data:Any):
@@ -267,6 +414,8 @@ class ToolFile(BaseModel):
return os.path.getsize(self.OriginFullPath)
def GetExtension(self):
return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str:
return self.OriginFullPath
def GetFilename(self, is_without_extension = False):
@@ -276,7 +425,7 @@ class ToolFile(BaseModel):
'''
if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\':
elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1])
else:
return GetBaseFilename(self.OriginFullPath)
@@ -288,7 +437,7 @@ class ToolFile(BaseModel):
return os.path.dirname(self.OriginFullPath)
def IsDir(self):
if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/':
if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True
else:
return os.path.isdir(self.OriginFullPath)
@@ -637,8 +786,11 @@ class ToolFile(BaseModel):
ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写
"""
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
@@ -999,32 +1151,4 @@ class ToolFile(BaseModel):
是否隐藏
"""
return self.get_permissions()['hidden']
def split_elements(
file: Union[ToolFile, str],
*,
ratios: List[float] = [1,1],
pr: Optional[Callable[[ToolFile], bool]] = None,
shuffler: Optional[Callable[[List[ToolFile]], None]] = None,
output_dirs: Optional[List[ToolFile]] = None,
output_must_exist: bool = True,
output_callback: Optional[Callable[[ToolFile], None]] = None
) -> List[List[ToolFile]]:
result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(),
ratios=ratios,
pr=pr,
shuffler=shuffler)
if output_dirs is None:
return result
for i in range(min(len(output_dirs), len(result))):
output_dir: ToolFile = output_dirs[i]
if output_dir.IsDir() is False:
raise Exception("Outputs must be directory")
if output_must_exist:
output_dir.must_exists_as_new()
for file in result[i]:
current = output_dirs[i].MakeFileInside(file)
if output_callback:
output_callback(current)
return result

View File

@@ -53,6 +53,7 @@ class GlobalConfig:
# 检查配置文件,不存在则生成空配置
self._data_pair: Dict[str, Any] = {}
self._data_find: Dict[str, Any] = {}
self._const_config_file = ConstConfigFile
config_file = self.ConfigFile
@@ -160,7 +161,8 @@ class GlobalConfig:
"""保存配置到文件"""
config = self.ConfigFile
config.SaveAsJson({
"properties": self._data_pair
"properties": self._data_pair,
"find": self._data_find
})
return self
@@ -232,6 +234,7 @@ class GlobalConfig:
return self._data_pair[key]
else:
self.LogPropertyNotFound(key, default=default)
self._data_find[key] = default
return default

View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

View File

@@ -241,7 +241,7 @@ def ToType(
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1]
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}")
if type_module is not None:
return sys.modules[type_module].__dict__[type_final]
@@ -304,7 +304,7 @@ def DecayType(
return type_hint
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None
@@ -333,7 +333,7 @@ def DecayType(
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
@@ -456,7 +456,7 @@ class ValueInfo(BaseInfo):
super().__init__(**kwargs)
self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0:
print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args
if not isinstance(metaType, type):
@@ -546,7 +546,7 @@ class ValueInfo(BaseInfo):
**kwargs
) -> 'ValueInfo':
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type):
if metaType is list:
@@ -565,7 +565,7 @@ class ValueInfo(BaseInfo):
return ValueInfo(metaType, **kwargs)
else:
return ValueInfo(type_, **kwargs)
elif isinstance(metaType, Self):#metaType is Self:
elif metaType is Self:
if SelfType is None:
raise ReflectionException("SelfType is required when metaType is <Self>")
return ValueInfo.Create(SelfType, **kwargs)
@@ -601,7 +601,7 @@ class FieldInfo(MemberInfo):
selfType: type|Any|None = None
):
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__(
name = name,
@@ -611,7 +611,7 @@ class FieldInfo(MemberInfo):
)
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property
@@ -746,7 +746,7 @@ class MethodInfo(MemberInfo):
is_class_method: bool,
):
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
@@ -1143,12 +1143,12 @@ class RefType(ValueInfo):
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}"
elif currentType.RealType in type_set:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return {
"type": f"{currentType.RealType}",
@@ -1156,13 +1156,13 @@ class RefType(ValueInfo):
}
else:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType)
value = {}
fields = currentType.GetFields()
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return {
@@ -1198,7 +1198,7 @@ class RefType(ValueInfo):
# 确保正确地实现所有GetBase*方法
@functools.lru_cache(maxsize=128)
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
def _GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1206,8 +1206,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetFields(flag))
return result
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
return self._GetBaseFields(flag)
@functools.lru_cache(maxsize=128)
def GetAllBaseFields(self) -> List[FieldInfo]:
def _GetAllBaseFields(self) -> List[FieldInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1215,9 +1218,12 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllFields())
return result
def GetAllBaseFields(self) -> List[FieldInfo]:
return self._GetAllBaseFields()
# 修改所有的GetBase*方法
@functools.lru_cache(maxsize=128)
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
def _GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1225,8 +1231,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetMethods(flag))
return result
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
return self._GetBaseMethods(flag)
@functools.lru_cache(maxsize=128)
def GetAllBaseMethods(self) -> List[MethodInfo]:
def _GetAllBaseMethods(self) -> List[MethodInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1234,8 +1243,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllMethods())
return result
def GetAllBaseMethods(self) -> List[MethodInfo]:
return self._GetAllBaseMethods()
@functools.lru_cache(maxsize=128)
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
def _GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1243,8 +1255,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetMembers(flag))
return result
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
return self._GetBaseMembers(flag)
@functools.lru_cache(maxsize=128)
def GetAllBaseMembers(self) -> List[MemberInfo]:
def _GetAllBaseMembers(self) -> List[MemberInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
@@ -1252,6 +1267,9 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllMembers())
return result
def GetAllBaseMembers(self) -> List[MemberInfo]:
return self._GetAllBaseMembers()
def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
@@ -1411,7 +1429,7 @@ class TypeManager(BaseModel):
if data is None:
raise ReflectionException("data is None")
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
@@ -1436,7 +1454,7 @@ class TypeManager(BaseModel):
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Get "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type
@@ -1489,7 +1507,7 @@ class TypeManager(BaseModel):
try:
ref_type = RefType(metaType)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Create "\
PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,264 +0,0 @@
# Visual 模块
Visual模块提供了数据可视化和图像处理相关的功能包括数据图表、图像处理、词云等。
## 目录结构
- `Core.py`: 核心数据可视化功能
- `OpenCV.py`: OpenCV图像处理功能
- `WordCloud.py`: 词云生成功能
- `Manim.py`: 数学动画功能
## 功能特性
### 1. 数据可视化 (Core.py)
#### 1.1 基础图表
- 折线图
- 柱状图
- 散点图
- 直方图
- 饼图
- 箱线图
- 热力图
- 分类数据图
- 联合图
#### 1.2 数据处理
- 缺失值处理
- 重复值处理
- 数据标准化
- 数据归一化
### 2. 图像处理 (OpenCV.py)
#### 2.1 图像操作
- 图像加载
- 支持多种格式jpg, png, bmp等
- 支持从文件路径或URL加载
- 支持从内存缓冲区加载
- 图像保存
- 支持多种格式输出
- 支持质量参数设置
- 支持压缩选项
- 图像显示
- 支持窗口标题设置
- 支持窗口大小调整
- 支持键盘事件处理
- 图像转换
- RGB转灰度
- RGB转HSV
- RGB转LAB
- 支持自定义转换矩阵
- 图像缩放
- 支持多种插值方法
- 支持保持宽高比
- 支持指定目标尺寸
- 图像旋转
- 支持任意角度旋转
- 支持旋转中心点设置
- 支持旋转后尺寸调整
- 图像翻转
- 水平翻转
- 垂直翻转
- 对角线翻转
- 图像合并
- 支持多图像拼接
- 支持透明度混合
- 支持蒙版处理
#### 2.2 ImageObject类详解
ImageObject类提供了完整的图像处理功能
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 基本属性
width = image.width # 图像宽度
height = image.height # 图像高度
channels = image.channels # 通道数
dtype = image.dtype # 数据类型
# 图像处理
image.resize_image(800, 600) # 调整大小
image.convert_to_grayscale() # 转换为灰度图
image.filter_gaussian((5, 5), 1.5, 1.5) # 高斯滤波
image.rotate_image(45) # 旋转45度
image.flip_image(horizontal=True) # 水平翻转
# 图像增强
image.adjust_brightness(1.2) # 调整亮度
image.adjust_contrast(1.5) # 调整对比度
image.adjust_saturation(0.8) # 调整饱和度
image.equalize_histogram() # 直方图均衡化
# 边缘检测
image.detect_edges(threshold1=100, threshold2=200) # Canny边缘检测
image.detect_contours() # 轮廓检测
# 特征提取
keypoints = image.detect_keypoints() # 关键点检测
descriptors = image.compute_descriptors() # 描述子计算
# 图像保存
image.save_image("output.jpg", quality=95) # 保存图像
image.save_image("output.png", compression=9) # 保存PNG
# 图像显示
image.show_image("预览") # 显示图像
image.wait_key(0) # 等待按键
# 图像信息
print(image.get_info()) # 获取图像信息
print(image.get_histogram()) # 获取直方图
```
#### 2.3 图像增强
- 边缘检测
- 滤波处理
- 阈值处理
- 形态学操作
- 轮廓检测
- 特征匹配
#### 2.4 视频处理
- 视频读取
- 视频写入
- 摄像头控制
- 帧处理
### 3. 词云生成 (WordCloud.py)
#### 3.1 词云功能
- 词云创建
- 标题设置
- 渲染输出
- 样式定制
### 4. 数学动画 (Manim.py)
#### 4.1 动画功能
- 数学公式动画
- 几何图形动画
- 图表动画
- 场景管理
## 使用示例
### 1. 数据可视化示例
```python
from Convention.Visual import Core
# 创建数据可视化生成器
generator = Core.data_visual_generator("data.csv")
# 绘制折线图
generator.plot_line("x", "y", title="折线图示例")
# 绘制柱状图
generator.plot_bar("category", "value", title="柱状图示例")
# 绘制散点图
generator.plot_scatter("x", "y", title="散点图示例")
# 绘制饼图
generator.plot_pie("category", title="饼图示例")
```
### 2. 图像处理示例
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 图像处理
image.resize_image(800, 600)
image.convert_to_grayscale()
image.filter_gaussian((5, 5), 1.5, 1.5)
# 保存图像
image.save_image("output.jpg")
```
### 3. 词云生成示例
```python
from Convention.Visual import WordCloud
# 创建词云
wordcloud = WordCloud.make_word_cloud("词云", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
])
# 设置标题
WordCloud.set_title(wordcloud, "编程语言词云")
# 渲染输出
WordCloud.render_to(wordcloud, "wordcloud.html")
```
### 4. 视频处理示例
```python
from Convention.Visual import OpenCV
# 创建视频捕获对象
camera = OpenCV.light_cv_camera(0)
# 创建视频写入对象
writer = OpenCV.VideoWriterInstance(
"output.avi",
OpenCV.avi_with_Xvid_fourcc(),
30.0,
(640, 480)
)
# 录制视频
def stop_condition():
return OpenCV.is_current_key('q')
camera.recording(stop_condition, writer)
```
## 依赖项
- matplotlib: 数据可视化
- seaborn: 高级数据可视化
- opencv-python: 图像处理
- pyecharts: 词云生成
- manim: 数学动画
## 注意事项
1. 使用图像处理时注意内存占用
2. 视频处理时注意帧率设置
3. 词云生成时注意数据量
4. 动画制作时注意性能优化
## 性能优化
1. 使用图像处理时注意批量处理
2. 视频处理时使用合适的编码格式
3. 词云生成时控制词数
4. 动画制作时优化渲染设置
## 贡献指南
欢迎提交Issue和Pull Request来改进功能或添加新特性。

View File

@@ -1,66 +0,0 @@
from ..Internal import *
from pyecharts.charts import WordCloud
from pyecharts import options as opts
from pyecharts import types
#from ..File.Core import tool_file, UnWrapper as UnWrapper2Str
def make_word_cloud(
series_name: str,
data_pair: Sequence[Tuple[str, int]],
**kwargs,
):
wordcloud = WordCloud()
wordcloud.add(series_name, data_pair, **kwargs)
return wordcloud
def set_title(
wordcloud: WordCloud,
title: str
):
wordcloud.set_global_opts(
title_opts=opts.TitleOpts(title=title)
)
def render_to(
wordcloud: WordCloud,
file_name: Union[tool_file, str]
):
wordcloud.render(UnWrapper2Str(file_name))
class light_word_cloud(left_value_reference[WordCloud]):
def __init__(
self,
series_name: str,
data_pair: types.Sequence,
**kwargs,
):
super().__init__(make_word_cloud(series_name, data_pair, **kwargs))
def set_title(
self,
title: str
):
set_title(self.ref_value, title)
def render_to(
self,
file_name: Union[tool_file, str]
):
render_to(self.ref_value, file_name)
if __name__ == "__main__":
# 准备数据
wordcloud = make_word_cloud("", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
("Go", 60),
("Rust", 50),
("C#", 40),
("PHP", 30),
("Swift", 20),
("Kotlin", 10),
], word_size_range=[20, 100])
set_title(wordcloud, "cloud")
render_to(wordcloud, "wordcloud.html")

515
Convention/Runtime/Web.py Normal file
View File

@@ -0,0 +1,515 @@
from .Config import *
from .File import ToolFile
import json
import urllib.parse
import urllib.request
import urllib.error
import asyncio
import os
import re
from typing import *
from pydantic import BaseModel
try:
import aiohttp
import aiofiles
except ImportError as e:
ImportingThrow(e, "Web", ["aiohttp", "aiofiles"])
class WebError(Exception):
"""网络操作异常基类"""
pass
class URLValidationError(WebError):
"""URL验证异常"""
pass
class HTTPRequestError(WebError):
"""HTTP请求异常"""
pass
class DownloadError(WebError):
"""下载异常"""
pass
class ToolURL(BaseModel):
"""网络URL工具类提供HTTP客户端和URL操作功能"""
url: str
def __init__(self, url: Union[str, 'ToolURL']):
"""
从URL字符串创建对象
Args:
url: URL字符串或ToolURL对象
"""
if isinstance(url, ToolURL):
url = url.url
super().__init__(url=str(url))
def __str__(self) -> str:
"""隐式字符串转换"""
return self.url
def __bool__(self) -> bool:
"""隐式布尔转换等同于IsValid"""
return self.IsValid
def ToString(self) -> str:
"""获取完整URL"""
return self.url
def GetFullURL(self) -> str:
"""获取完整URL"""
return self.url
@property
def FullURL(self) -> str:
"""获取完整URL属性"""
return self.url
@property
def IsValid(self) -> bool:
"""检查URL是否有效"""
return self.ValidateURL()
def ValidateURL(self) -> bool:
"""
验证URL格式
Returns:
是否为有效的HTTP/HTTPS URL
"""
try:
parsed = urllib.parse.urlparse(self.url)
return parsed.scheme in ('http', 'https') and parsed.netloc != ''
except Exception:
return False
def GetFilename(self) -> str:
"""
获取URL中的文件名
Returns:
URL路径中的文件名
"""
try:
parsed = urllib.parse.urlparse(self.url)
path = parsed.path
if path:
return os.path.basename(path)
return ""
except Exception:
return ""
def GetExtension(self) -> str:
"""
获取文件扩展名
Returns:
文件扩展名(不包含点)
"""
filename = self.GetFilename()
if '.' in filename:
return filename.split('.')[-1].lower()
return ""
def ExtensionIs(self, *extensions: str) -> bool:
"""
检查扩展名是否匹配
Args:
*extensions: 要检查的扩展名列表
Returns:
是否匹配任一扩展名
"""
current_ext = self.GetExtension()
return current_ext in [ext.lower().lstrip('.') for ext in extensions]
def Open(self, url: str) -> 'ToolURL':
"""
在当前对象上打开新URL
Args:
url: 新的URL字符串
Returns:
更新后的ToolURL对象
"""
self.url = str(url)
return self
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件txt, html, htm, css, js, xml, csv"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件jpg, jpeg, png, gif, bmp, svg"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件pdf, doc, docx, xls, xlsx, ppt, pptx"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
# HTTP请求方法
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
with urllib.request.urlopen(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
data = None
if form_data:
data = urllib.parse.urlencode(form_data).encode('utf-8')
req = urllib.request.Request(self.url, data=data, method='POST')
if form_data:
req.add_header('Content-Type', 'application/x-www-form-urlencoded')
with urllib.request.urlopen(req) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 异步HTTP请求方法
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.url, data=form_data) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 内容加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
content = response.read()
# 尝试检测编码
encoding = response.headers.get_content_charset() or 'utf-8'
return content.decode(encoding)
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.text()
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
return response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = self.LoadAsText()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
# 文件保存和下载功能
def Save(self, local_path: Optional[str] = None) -> ToolFile:
"""
自动选择格式保存到本地
Args:
local_path: 本地保存路径如果为None则自动生成
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
if self.IsText:
return self.SaveAsText(local_path)
elif self.IsJson:
return self.SaveAsJson(local_path)
else:
return self.SaveAsBinary(local_path)
def SaveAsText(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为文本文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
text_content = self.LoadAsText()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(text_content)
return file_obj
def SaveAsJson(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为JSON文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
json_data = self.LoadAsJson()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(json_data)
return file_obj
def SaveAsBinary(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为二进制文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
binary_content = self.LoadAsBinary()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(binary_content)
return file_obj
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
同步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
return self.Save(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
file_obj.SaveAsText(content)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
file_obj.SaveAsJson(content)
else:
content = await self.LoadAsBinaryAsync()
file_obj.SaveAsBinary(content)
return file_obj
except Exception as e:
raise DownloadError(f"Failed to download {self.url}: {str(e)}")
# 静态HTTP客户端实例避免连接池耗尽
_http_session: Optional[aiohttp.ClientSession] = None
async def get_http_session() -> aiohttp.ClientSession:
"""获取全局HTTP会话实例"""
global _http_session
if _http_session is None or _http_session.closed:
_http_session = aiohttp.ClientSession()
return _http_session
async def close_http_session():
"""关闭全局HTTP会话"""
global _http_session
if _http_session and not _http_session.closed:
await _http_session.close()
_http_session = None

254
README.md
View File

@@ -0,0 +1,254 @@
# Convention-Python
Convention-Python基于 Convention-Template 规范实现的一套完整的开发工具集。
## 主要内容
### 辅助 (Config.py)
- **内置依赖**: 提供辅助函数与辅助类型
### 架构 (Architecture.py)
- **依赖注入容器**: 支持类型注册、依赖解析和生命周期管理
- **信号系统**: 提供发布-订阅模式的消息通信机制
- **时间线管理**: 支持基于条件的任务队列和执行流程控制
- **单例模式**: 内置单例模型支持
### 异步 (Asynchrony.py)
- **线程管理**: 提供线程实例、原子操作、锁机制
- **并发控制**: 支持线程安全的数据结构和操作
- **异步工具**: 简化异步编程的工具函数
### 配置 (GlobalConfig.py)
- **类型系统**: 强大的类型检查和转换系统
- **调试支持**: 内置调试模式和彩色输出
- **平台兼容**: 跨平台路径和环境管理
- **全局配置**: 统一的配置管理机制
### 文件 (File.py)
- **ToolFile 类**: 强大的文件操作封装
- 支持多种文件格式 (JSON, CSV, Excel, 图像, 音频, Word文档等)
- 文件压缩和解压缩 (ZIP, TAR)
- 文件加密和解密
- 哈希值计算和验证
- 文件监控和备份
- 权限管理
- **批量处理**: 支持文件批量操作和处理
### 序列化 (EasySave.py)
- **序列化支持**: JSON 和二进制格式的序列化
- **反射集成**: 基于反射的对象序列化和反序列化
- **备份机制**: 自动备份和恢复功能
- **字段过滤**: 支持自定义字段选择和忽略规则
### 反射 (Reflection.py)
- **类型管理**: 完整的类型信息管理和缓存
- **成员访问**: 字段和方法的动态访问
- **类型转换**: 灵活的类型转换和验证
- **泛型支持**: 支持泛型类型的处理
### 视觉 (Visual)
#### 可视化 (Visual/Core.py)
- **图表生成**: 支持多种图表类型 (折线图、柱状图、散点图、饼图等)
- **数据处理**: 数据清洗、标准化、归一化
- **样式定制**: 丰富的图表样式和主题选项
#### 图像处理 (Visual/OpenCV.py)
- **ImageObject 类**: 完整的图像处理功能
- **图像增强**: 支持 30+ 种图像增强算法
- **格式转换**: 支持多种图像格式转换
- **批量处理**: 支持图像批量处理和增强
#### 词云生成 (Visual/WordCloud.py)
- **词云创建**: 支持中英文词云生成
- **样式定制**: 丰富的样式和布局选项
### 字符串工具 (String.py)
- **字符串处理**: 长度限制、填充、编码转换
- **中文分词**: 集成 jieba 分词支持
## 安装说明
### 环境要求
- Python >= 3.12
- 操作系统: Windows, Linux, macOS
### 依赖包
运行时自动报告需要被引入的包
调用Config中ReleaseFailed2Requirements函数生成requirements.txt文件
### 安装方式
1. **从源码安装**:
```bash
git clone https://github.com/NINEMINEsigma/Convention-Python.git
cd Convention-Python
pip install -e .
```
2. **直接安装**:
```bash
pip install .
```
3. **打包安装**
```bash
pip install build
python -m build
pip install dist/convention.tar.gz
```
## 🚀 使用示例
### 架构模式示例
```python
from Convention.Runtime import Architecture
# 注册服务
class DatabaseService:
def query(self, sql): return "result"
db_service = DatabaseService()
Architecture.Register(DatabaseService, db_service, lambda: print("DB服务初始化"))
# 获取服务
service = Architecture.Get(DatabaseService)
result = service.query("SELECT * FROM users")
```
### 文件操作示例
```python
from Convention.Runtime import ToolFile
# 创建文件对象
file = ToolFile("data.json")
# 保存和加载 JSON 数据
data = {"name": "张三", "age": 25}
file.SaveAsJson(data)
loaded_data = file.LoadAsJson()
# 文件压缩
compressed = file.Compress("backup.zip")
# 计算哈希值
hash_value = file.calculate_hash("sha256")
```
### 数据序列化示例
```python
from Convention.Runtime import EasySave
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
email: str
# 保存数据
user = User(name="李四", age=30, email="lisi@example.com")
EasySave.Write(user, "user.json")
# 读取数据
loaded_user = EasySave.Read(User, "user.json")
```
### 数据可视化示例
```python
from Convention.Runtime.Visual import Core
import pandas as pd
# 创建数据可视化生成器
df = pd.read_csv("sales_data.csv")
generator = Core.data_visual_generator("sales_data.csv")
# 绘制图表
generator.plot_line("month", "sales", title="月度销售趋势")
generator.plot_bar("product", "revenue", title="产品收入对比")
generator.plot_pie("category", title="类别分布")
```
### 图像处理示例
```python
from Convention.Runtime.Visual.OpenCV import ImageObject
from Convention.Runtime.Visual.Core import ImageAugmentConfig, ResizeAugmentConfig
# 加载图像
image = ImageObject("input.jpg")
# 图像增强配置
config = ImageAugmentConfig(
resize=ResizeAugmentConfig(width=800, height=600),
lighting=LightingAugmentConfig(lighting=20),
contrast=ContrastAugmentConfig(contrast=1.2)
)
# 批量增强
results = config.augment_from_dir_to("input_dir", "output_dir")
```
## 打包指令
### 构建分发包
```bash
# 清理之前的构建文件
python setup.py clean --all
rm -rf build/ dist/ *.egg-info/
# 构建源码包和轮子包
python setup.py sdist bdist_wheel
# 或使用 build 工具 (推荐)
pip install build
python -m build
```
### 安装本地包
```bash
# 开发模式安装 (可编辑安装)
pip install -e .
# 普通安装
pip install .
```
### 上传到 PyPI
```bash
# 安装上传工具
pip install twine
# 检查包
twine check dist/*
# 上传到测试 PyPI
twine upload --repository-url https://test.pypi.org/legacy/ dist/*
# 上传到正式 PyPI
twine upload dist/*
```
### 创建可执行文件
```bash
# 使用 PyInstaller
pip install pyinstaller
pyinstaller --onefile --name convention-tool your_main_script.py
```
## 许可证
本项目采用 MIT 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情。
## 作者
**LiuBai** - [NINEMINEsigma](https://github.com/NINEMINEsigma)
## 相关链接
- [Convention-Template](https://github.com/NINEMINEsigma/Convention-Template) - 项目模板规范
- [GitHub Issues](https://github.com/NINEMINEsigma/Convention-Python/issues) - 问题反馈
- [GitHub Releases](https://github.com/NINEMINEsigma/Convention-Python/releases) - 版本发布
*最后更新: 2025年9月*

View File

@@ -1,20 +0,0 @@
{
"easy": {
"__type": "__main__.test_log, Global",
"value": {
"__type": "__main__.test_log, Global",
"model_computed_fields": {
"__type": "typing.Any, Global"
},
"model_extra": null,
"model_fields": {
"__type": "typing.Any, Global"
},
"model_fields_set": {
"__type": "typing.Any, Global"
},
"test_field": 1,
"test_field_2": "test"
}
}
}

View File

@@ -1,19 +1,10 @@
import sys
import os
from time import sleep
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.Config import *
from Convention.Runtime.EasySave import *
from Convention.Runtime.File import *
class Test:
test_field:int = 10
class_test_field:int = 20
file = ToolFile("[Test]")|"temp"|None
print(file.MustExistsPath())
def __init__(self):
self.test_field:int = 0
def run():
print(Test.__annotations__)
if __name__ == "__main__":
run()

View File

@@ -1,3 +0,0 @@
import math
import r
print(re.findall(r"\d+[.\d]?", "xxxxx$19.99"))