Compare commits

..

19 Commits

Author SHA1 Message Date
007db5a06b Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-14 19:29:03 +08:00
c11469a108 ToolFile+AbsPath 2025-10-14 19:26:05 +08:00
81209e85ee 增强ToolFile对斜杠的判断与相关行为 2025-10-14 17:09:04 +08:00
97c57f65df Update ToolFile 2025-10-13 17:01:21 +08:00
5b38b6239e Update ToolFile 2025-10-13 16:25:19 +08:00
f146d241eb 1.File新增逐行读取迭代2.Rename PrintColorful 2025-10-13 15:39:29 +08:00
7ff00a8ab9 放弃EP Visual 2025-09-30 14:49:39 +08:00
b02eafcb35 EP Interaction 2025-09-30 10:40:58 +08:00
ecaab13948 EP Interaction 2025-09-29 16:22:10 +08:00
45a6689db8 GlobalConfig新增find字段 2025-09-26 10:34:34 +08:00
3cb7b11756 Try Web.py 2025-09-25 14:26:32 +08:00
4010d9dd8c Update Architecture.py 2025-09-25 11:52:26 +08:00
6331c8e025 忽略.vscode/ 2025-09-22 10:05:03 +08:00
ninemine
8365867823 Delete .vscode directory 2025-09-22 10:00:01 +08:00
494bf300be 修复一些bug 2025-09-20 03:39:51 +08:00
f1197d203b Update README.md 2025-09-09 09:55:22 +08:00
ninemine
f3f3cc8c54 Merge pull request #1 from NINEMINEsigma/EP-Asynchrony
EP Asynchrony
2025-07-25 10:54:01 +08:00
2bb6f924df EP Asynchrony 2025-07-25 10:48:07 +08:00
4d0f24fd0c EP Asynchrony 非因果型异步架构 2025-07-24 11:45:44 +08:00
20 changed files with 2247 additions and 3344 deletions

3
.gitignore vendored
View File

@@ -179,3 +179,6 @@ cython_debug/
# refer to https://docs.cursor.com/context/ignore-files # refer to https://docs.cursor.com/context/ignore-files
.cursorignore .cursorignore
.cursorindexingignore .cursorindexingignore
# IDE
.vscode/

View File

@@ -1,3 +0,0 @@
{
"python.languageServer": "None"
}

View File

@@ -5,28 +5,38 @@ from abc import ABC, abstractmethod
class ISignal(ABC): class ISignal(ABC):
pass pass
class IModel(ABC): class IModel(ABC):
pass
class IDataModel(ABC, IModel):
@abstractmethod @abstractmethod
def Save(self) -> str: def Save(self) -> str:
pass pass
@abstractmethod @abstractmethod
def Load(self, data:str) -> None: def Load(self, data:str) -> None:
pass pass
class IConvertable[T](ABC): class IConvertable[T](ABC):
@abstractmethod @abstractmethod
def ConvertTo(self) -> T: def ConvertTo(self) -> T:
pass pass
class IConvertModel[T](IConvertable[T], IModel): class IConvertModel[T](IConvertable[T], IModel):
pass pass
class SingletonModel[T](IModel): class SingletonModel[T](IModel):
_InjectInstances:Dict[type,Any] = {} _InjectInstances:Dict[type,Any] = {}
@staticmethod @staticmethod
def GetInstance(t:Typen[T]) -> T: def GetInstance(t:Typen[T]) -> T:
return SingletonModel._InjectInstances[t] return SingletonModel._InjectInstances[t]
@staticmethod @staticmethod
def SetInstance(t:Typen[T], obj:T) -> None: def SetInstance(t:Typen[T], obj:T) -> None:
SingletonModel._InjectInstances[t] = obj SingletonModel._InjectInstances[t] = obj
@@ -34,9 +44,6 @@ class SingletonModel[T](IModel):
def __init__(self, t:Typen[T]) -> None: def __init__(self, t:Typen[T]) -> None:
self.typen: type = t self.typen: type = t
@override
def Save(self) -> str:
return SingletonModel.GetInstance(self.typen).Save()
class DependenceModel(IConvertModel[bool]): class DependenceModel(IConvertModel[bool]):
def __init__(self, queries:Sequence[IConvertModel[bool]]) -> None: def __init__(self, queries:Sequence[IConvertModel[bool]]) -> None:
@@ -52,14 +59,10 @@ class DependenceModel(IConvertModel[bool]):
def __iter__(self): def __iter__(self):
return iter(self.queries) return iter(self.queries)
def Load(self, data:str):
raise NotImplementedError()
def Save(self) -> str:
raise NotImplementedError()
SignalListener = Callable[[ISignal], None] SignalListener = Callable[[ISignal], None]
class Architecture: class Architecture:
@staticmethod @staticmethod
def FormatType(t:type) -> str: def FormatType(t:type) -> str:
@@ -78,14 +81,11 @@ class Architecture:
@classmethod @classmethod
def InternalReset(cls) -> None: def InternalReset(cls) -> None:
# Register System # Register System
cls._RegisterHistory.clear() cls._RegisteredObjects.clear()
cls._UncompleteTargets.clear() cls._RegisteringRuntime.clear()
cls._Completer.clear() # Signal Listener
cls._Dependences.clear()
cls._Childs.clear()
# Event Listener
cls._SignalListener.clear() cls._SignalListener.clear()
# Linear Chain for Dependence # Timeline/Chain
cls._TimelineQueues.clear() cls._TimelineQueues.clear()
cls._TimelineContentID = 0 cls._TimelineContentID = 0
@@ -97,136 +97,79 @@ class Architecture:
@override @override
def ConvertTo(self) -> bool: def ConvertTo(self) -> bool:
return self._queryType in Architecture._Childs return self._queryType in Architecture._RegisteredObjects
def Load(self, data:str) -> None:
raise NotImplementedError()
def Save(self) -> str:
raise NotImplementedError()
_RegisterHistory: Set[type] = set()
_UncompleteTargets: Dict[type,Any] = {}
_Completer: Dict[type,Action] = {}
_Dependences: Dict[type,DependenceModel] = {}
_Childs: Dict[type,Any] = {}
class Registering(IConvertModel[bool]): class Registering(IConvertModel[bool]):
def __init__(self,registerSlot:type) -> None: def __init__(self, registerSlot:type, target:Any, dependences:DependenceModel, action:Action) -> None:
self._registerSlot:type = registerSlot self.registerSlot = registerSlot
self.target = target
self.dependences = dependences
self.action = action
@override @override
def ConvertTo(self) -> bool: def ConvertTo(self) -> bool:
return self._registerSlot in Architecture._Childs return self.dependences.ConvertTo()
@override _RegisteringRuntime: Dict[type, Registering] = {}
def Load(self,data:str) -> None: _RegisteredObjects: Dict[type, Any] = {}
raise InvalidOperationError(f"Cannot use {self.__class__.__name__} to load type")
@override
def Save(self) -> str:
return f"{Architecture.FormatType(self._registerSlot)}[{self.ConvertTo()}]"
@classmethod @classmethod
def _InternalRegisteringComplete(cls) -> tuple[bool,Set[type]]: def _InternalRegisteringComplete(cls) -> None:
resultSet: Set[type] = set() CompletedSet: Set[Architecture.Registering] = set()
stats: bool = False for dependence in cls._RegisteringRuntime.keys():
for dependence in cls._Dependences.keys(): if cls._RegisteringRuntime[dependence].dependences.ConvertTo():
if cls._Dependences[dependence].ConvertTo(): CompletedSet.add(cls._RegisteringRuntime[dependence])
resultSet.add(dependence) for complete in CompletedSet:
stats = True del cls._RegisteringRuntime[complete.registerSlot]
return stats,resultSet complete.action()
cls._RegisteredObjects[complete.registerSlot] = complete.target
if len(CompletedSet) > 0:
cls._InternalRegisteringComplete()
@classmethod @classmethod
def _InternalRegisteringUpdate(cls, internalUpdateBuffer:Set[type]): def Register(cls, slot:type, target:Any, action:Action, *dependences:type) -> DependenceModel:
for complete in internalUpdateBuffer: if slot in cls._RegisteringRuntime:
cls._Dependences.pop(complete, None)
for complete in internalUpdateBuffer:
cls._Completer[complete]()
cls._Completer.pop(complete, None)
for complete in internalUpdateBuffer:
cls._Childs[complete] = cls._UncompleteTargets[complete]
cls._UncompleteTargets.pop(complete, None)
@classmethod
def Register(cls, slot:type, target:Any, completer:Action, *dependences:type) -> 'Architecture.Registering':
if slot in cls._RegisterHistory:
raise InvalidOperationError("Illegal duplicate registrations") raise InvalidOperationError("Illegal duplicate registrations")
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
cls._RegisterHistory.add(slot) cls._InternalRegisteringComplete()
cls._Completer[slot] = completer return cls._RegisteringRuntime[slot].dependences
cls._UncompleteTargets[slot] = target
# 过滤掉自身依赖
filtered_deps = [dep for dep in dependences if dep != slot]
type_queries = [cls.TypeQuery(dep) for dep in filtered_deps]
cls._Dependences[slot] = DependenceModel(type_queries)
while True:
has_complete, buffer = cls._InternalRegisteringComplete()
if not has_complete:
break
cls._InternalRegisteringUpdate(buffer)
return cls.Registering(slot)
@classmethod
def RegisterGeneric[T](cls, target:T, completer:Action, *dependences:type) -> 'Architecture.Registering':
return cls.Register(type(target), target, completer, *dependences)
@classmethod @classmethod
def Contains(cls, type_:type) -> bool: def Contains(cls, type_:type) -> bool:
return type_ in cls._Childs return type_ in cls._RegisteredObjects
@classmethod
def ContainsGeneric[T](cls) -> bool:
return cls.Contains(type(T))
@classmethod
def InternalGet(cls, type_:type) -> Any:
return cls._Childs[type_]
@classmethod @classmethod
def Get(cls, type_:type) -> Any: def Get(cls, type_:type) -> Any:
return cls.InternalGet(type_) return cls._RegisteredObjects[type_]
@classmethod @classmethod
def GetGeneric[T](cls) -> T: def Unregister(cls, slot:type) -> bool:
return cls.Get(type(T)) if slot in cls._RegisteredObjects:
del cls._RegisteredObjects[slot]
return True
if slot in cls._RegisteringRuntime:
del cls._RegisteringRuntime[slot]
return True
return False
#endregion #endregion
#region Signal & Update #region Signal & Update
_SignalListener: Dict[type, Set[SignalListener]] = {} _SignalListener: Dict[type, List[SignalListener]] = {}
class Listening:
def __init__(self, action:SignalListener, type_:type):
self._action = action
self._type = type_
def StopListening(self):
if self._type in Architecture._SignalListener:
Architecture._SignalListener[self._type].discard(self._action)
@classmethod @classmethod
def AddListenerGeneric[Signal](cls, slot:type, listener:SignalListener) -> 'Architecture.Listening': def AddListener(cls, slot:type, listener:SignalListener) -> None:
if slot not in cls._SignalListener: if slot not in cls._SignalListener:
cls._SignalListener[slot] = set() cls._SignalListener[slot] = []
def action(signal:ISignal): cls._SignalListener[slot].append(listener)
if isinstance(signal, slot):
listener(signal)
result = cls.Listening(action, slot)
cls._SignalListener[slot].add(action)
return result
@classmethod @classmethod
def SendMessage(cls, slot:type, signal:ISignal): def SendMessage(cls, slot:type, signal:ISignal):
if slot in cls._SignalListener: if slot in cls._SignalListener:
for action in cls._SignalListener[slot]: for listener in cls._SignalListener[slot]:
action(signal) listener(signal)
#endregion #endregion
@@ -285,6 +228,3 @@ class Architecture:
#endregion #endregion

View File

@@ -0,0 +1,353 @@
from .Config import *
from .Reflection import *
from collections import defaultdict
import asyncio
import threading
from typing import Optional
from pydantic import BaseModel
from abc import ABC, abstractmethod
class AsyncContextDetector:
"""异步上下文检测工具类"""
@staticmethod
def is_in_async_context() -> bool:
"""检查是否在异步上下文中运行"""
try:
asyncio.current_task()
return True
except RuntimeError:
return False
@staticmethod
def get_current_loop() -> Optional[asyncio.AbstractEventLoop]:
"""获取当前事件循环如果没有则返回None"""
try:
return asyncio.get_running_loop()
except RuntimeError:
return None
@staticmethod
def ensure_async_context_safe(operation_name: str) -> None:
"""确保在异步上下文中执行是安全的"""
if AsyncContextDetector.is_in_async_context():
raise RuntimeError(
f"Cannot perform '{operation_name}' from within an async context. "
f"Use await or async methods instead."
)
class AsyncFieldAccessor:
"""异步字段访问器,封装字段访问逻辑"""
def __init__(
self,
async_fields: Dict[str, 'AsynchronyExpression'],
origin_fields: Dict[str, FieldInfo]
) -> None:
self._async_fields = async_fields
self._origin_fields = origin_fields
async def get_field_value_async(self, field_name: str):
"""异步获取字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return await self._async_fields[field_name].get_value()
def get_field_value_sync(self, field_name: str):
"""同步获取字段值(仅在非异步上下文中使用)"""
AsyncContextDetector.ensure_async_context_safe(f"sync access to field '{field_name}'")
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
async_expr = self._async_fields[field_name]
if not async_expr.is_initialize and async_expr.timeout > 0:
# 需要等待但在同步上下文中使用run_async
return run_async(async_expr.get_value())
elif not async_expr.is_initialize:
raise RuntimeError(f"Field '{field_name}' is not initialized and has no timeout")
else:
return run_async(async_expr.get_value())
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return self._async_fields[field_name].is_initialize
def set_field_value(self, field_name: str, value: Any) -> None:
"""设置字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
self._async_fields[field_name].set_value(value)
class AsynchronyUninitialized:
"""表示未初始化状态的单例类"""
__instance__ = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls.__instance__ is None:
with cls._lock:
if cls.__instance__ is None:
cls.__instance__ = super().__new__(cls)
return cls.__instance__
def __repr__(self):
return "uninitialized"
def __str__(self):
return "None"
class AsynchronyExpression:
def __init__(
self,
field: FieldInfo,
value: Any = AsynchronyUninitialized(),
*,
time_wait: float = 0.1,
timeout: float = 0,
callback: Optional[Action] = None,
):
'''
参数:
field: 字段
value: 初始化, 默认为AsynchronyUninitialized, 即无初始化
time_wait: 等待时间, 默认为0.1秒
timeout: 超时时间, 默认为0秒
callback: 回调函数, 默认为None, 当状态为无初始化时get_value会调用callback
'''
self.field = field
self._value = value
self.callback = callback
self.is_initialize = not isinstance(value, AsynchronyUninitialized)
self.time_wait = time_wait
self.timeout = timeout
def get_value_sync(self):
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.is_initialize:
return self._value
else:
raise RuntimeError(f"Field {self.field.FieldName} is not initialized")
async def get_value(self):
"""异步获取字段值,改进的超时机制"""
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.timeout > 0:
try:
# 使用 asyncio.wait_for 提供更精确的超时控制
async def wait_for_initialization():
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
return await asyncio.wait_for(wait_for_initialization(), timeout=self.timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Timeout waiting for uninitialized field {self.field.FieldName}")
else:
# 无超时,一直等待
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
def set_value(self, value: Any) -> None:
"""设置字段值"""
if isinstance(value, AsynchronyUninitialized):
self.set_uninitialized()
elif self.field.Verify(type(value)):
self._value = value
self.is_initialize = True
else:
raise ValueError(f"Value {value} is not valid for field {self.field.FieldName}")
def SetUninitialized(self) -> None:
"""设置为未初始化状态(保持兼容性的旧方法名)"""
self.set_uninitialized()
def set_uninitialized(self) -> None:
"""设置为未初始化状态"""
if self.is_initialize:
del self._value
self._value = AsynchronyUninitialized()
self.is_initialize = False
class Asynchronous(ABC):
__Asynchronous_Origin_Fields__: Dict[Type, Dict[str, FieldInfo]] = defaultdict(dict)
_fields_lock = threading.Lock()
def _GetAsynchronousOriginFields(self) -> Dict[str, FieldInfo]:
return Asynchronous.__Asynchronous_Origin_Fields__[type(self)]
def __init__(self, **kwargs: Dict[str, dict]):
super().__init__()
self.__Asynchronous_Fields__: Dict[str, AsynchronyExpression] = {}
# 使用线程锁保护类变量访问
with Asynchronous._fields_lock:
origin_fields = self._GetAsynchronousOriginFields()
for field_info in TypeManager.GetInstance().CreateOrGetRefTypeFromType(type(self)).GetAllFields():
if field_info.FieldName == "__Asynchronous_Origin_Fields__":
continue
origin_fields[field_info.FieldName] = field_info
self.__Asynchronous_Fields__[field_info.FieldName] = AsynchronyExpression(
field_info, **kwargs.get(field_info.FieldName, {})
)
# 创建字段访问器以提升性能
self._field_accessor = AsyncFieldAccessor(self.__Asynchronous_Fields__, origin_fields)
def __getattribute__(self, name: str) -> Any:
# 快速路径:非异步字段直接返回
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
return super().__getattribute__(name)
# 一次性获取所需属性,避免重复调用
try:
field_accessor:AsyncFieldAccessor = super().__getattribute__("_field_accessor")
origin_fields:Dict[str, FieldInfo] = super().__getattribute__("_GetAsynchronousOriginFields")()
except AttributeError:
# 对象可能尚未完全初始化
return super().__getattribute__(name)
if name in origin_fields:
# 这是一个异步字段
if AsyncContextDetector.is_in_async_context():
# 在异步上下文中,提供友好的错误提示
async_fields:Dict[str, AsynchronyExpression] = super().__getattribute__("__Asynchronous_Fields__")
async_expr = async_fields[name]
if not async_expr.is_initialize:
timeout_info = f" with {async_expr.timeout}s timeout" if async_expr.timeout > 0 else ""
raise RuntimeError(
f"Field '{name}' is not initialized{timeout_info}. "
)
else:
# 字段已初始化,直接返回值
return async_expr.get_value_sync()
else:
# 在同步上下文中,使用字段访问器
try:
return field_accessor.get_field_value_sync(name)
except RuntimeError as e:
if "Cannot perform" in str(e):
# 重新包装错误信息,提供更友好的提示
raise RuntimeError(
f"Cannot access async field '{name}' from sync context when it requires initialization. "
f"Use async context or ensure field is pre-initialized."
) from e
else:
raise
return super().__getattribute__(name)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__setattr__(name, value)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
field_accessor.set_field_value(name, value)
return
except AttributeError:
# 不是异步字段
pass
super().__setattr__(name, value)
def __delattr__(self, name: str) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__delattr__(name)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
origin_fields = super().__getattribute__("_GetAsynchronousOriginFields")()
if name in origin_fields:
async_fields = super().__getattribute__("__Asynchronous_Fields__")
async_fields[name].set_uninitialized()
return
except AttributeError:
# 不是异步字段
pass
super().__delattr__(name)
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
return self._field_accessor.is_field_initialized(field_name)
def run_until_complete(coro: Coroutine) -> Any:
"""Gets an existing event loop to run the coroutine.
If there is no existing event loop, creates a new one.
"""
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.run_until_complete(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async_coroutine(coro: Coroutine) -> Any:
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.create_task(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async(coro: Coroutine):
"""安全地运行异步协程,避免事件循环死锁"""
# 使用统一的异步上下文检测
AsyncContextDetector.ensure_async_context_safe("run_async")
# 尝试获取当前事件循环
current_loop = AsyncContextDetector.get_current_loop()
if current_loop is not None and not current_loop.is_running():
# 有事件循环但未运行,直接使用
return current_loop.run_until_complete(coro)
elif current_loop is None:
# 没有事件循环,创建新的
try:
return asyncio.run(coro)
except RuntimeError as e:
raise RuntimeError(
"Failed to run async coroutine. "
"Please ensure proper async environment or use nest_asyncio.apply() for nested loops."
) from e
else:
# 事件循环正在运行这种情况应该被AsyncContextDetector捕获
raise RuntimeError(
"Unexpected state: running event loop detected but context check passed. "
"This should not happen."
)

View File

@@ -5,10 +5,35 @@ import sys
import threading import threading
import traceback import traceback
import datetime import datetime
import platform try:
import time
import os
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
except:
print("colorama is not installed")
class ConsoleFrontColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleBackgroundColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleStyle:
RESET = ""
BOLD = ""
DIM = ""
UNDERLINE = ""
REVERSE = ""
HIDDEN = ""
class NotImplementedError(Exception): class NotImplementedError(Exception):
def __init__(self, message:Optional[str]=None) -> None: def __init__(self, message:Optional[str]=None) -> None:
@@ -35,7 +60,7 @@ def GetInternalDebug() -> bool:
global INTERNAL_DEBUG global INTERNAL_DEBUG
return INTERNAL_DEBUG return INTERNAL_DEBUG
def print_colorful(color:str, *args, is_reset:bool=True, **kwargs): def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard(): with lock_guard():
if is_reset: if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs) print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
@@ -303,6 +328,12 @@ class PlatformIndicator:
CompanyName : str = "DefaultCompany" CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject" ProductName : str = "DefaultProject"
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod @staticmethod
def GetApplicationPath() -> str: def GetApplicationPath() -> str:
"""获取应用程序所在目录""" """获取应用程序所在目录"""
@@ -350,3 +381,25 @@ class DescriptiveIndicator[T]:
self.descripion : str = description self.descripion : str = description
self.value : T = value self.value : T = value
class Switch:
def __init__(self, value, isThougth = False) -> None:
self.value = value
self.isThougth = False
self.caseStats = False
self.result = None
def Case(self, caseValue, callback:Callable[[], Any]) -> 'Switch':
if self.caseStats and self.isThougth:
self.result = callback()
elif caseValue == self.value:
self.caseStats = True
self.result = callback()
return self
def Default(self, callback:Callable[[], Any]) -> Any:
if self.caseStats and self.isThougth:
self.result = callback()
elif self.caseStats == False:
self.caseStats = True
self.result = callback()
return self.result

View File

@@ -180,14 +180,14 @@ class ESReader(BaseModel):
''' '''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.') #module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug(): #if GetInternalEasySaveDebug():
# print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ # PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\ # f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}") # f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name) #typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to) #return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label) typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\ f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}") f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen) return TypeManager.GetInstance().CreateOrGetRefType(typen)
@@ -235,7 +235,7 @@ class ESReader(BaseModel):
if rtype is None: if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}") raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}") f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型 # 处理值类型
@@ -278,7 +278,7 @@ class ESReader(BaseModel):
else: else:
rinstance = rtype.CreateInstance() rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype) fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields: for field in fields:
@@ -289,19 +289,19 @@ class ESReader(BaseModel):
if field.FieldType == list and field.ValueType.IsGeneric: if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric: elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric: elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric: elif field.FieldType == dict and field.ValueType.IsGeneric:
@@ -309,13 +309,13 @@ class ESReader(BaseModel):
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1]) DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
) )
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>") f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else: else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>") f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName])) field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))

View File

@@ -1,10 +1,8 @@
import os.path
from .Config import * from .Config import *
import json import json
import shutil import shutil
import pandas as pd
import os import os
import sys
import pickle
import zipfile import zipfile
import tarfile import tarfile
import base64 import base64
@@ -14,21 +12,6 @@ import datetime
import stat import stat
from typing import * from typing import *
from pathlib import Path from pathlib import Path
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
from .String import Bytes2String
def GetExtensionName(file:str): def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:] return os.path.splitext(file)[1][1:]
@@ -67,8 +50,10 @@ class PermissionError(FileOperationError):
"""权限操作异常""" """权限操作异常"""
pass pass
from pydantic import BaseModel, GetCoreSchemaHandler, Field try:
from pydantic_core import core_schema from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel): class ToolFile(BaseModel):
OriginFullPath:str OriginFullPath:str
@@ -77,7 +62,10 @@ class ToolFile(BaseModel):
self, self,
filePath: Union[str, Self], filePath: Union[str, Self],
): ):
super().__init__(OriginFullPath=os.path.abspath(os.path.expandvars(str(filePath)))) filePath = os.path.expandvars(str(filePath))
if ":" in filePath:
filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath)
def __del__(self): def __del__(self):
pass pass
def __str__(self): def __str__(self):
@@ -89,9 +77,21 @@ class ToolFile(BaseModel):
def __or__(self, other): def __or__(self, other):
if other is None: if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else self.GetFullPath()+"\\") return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else: else:
return ToolFile(os.path.join(self.GetFullPath(), str(other))) # 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze"
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other): def __idiv__(self, other):
temp = self.__or__(other) temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath() self.OriginFullPath = temp.GetFullPath()
@@ -114,15 +114,18 @@ class ToolFile(BaseModel):
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other) other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath self_path = self.OriginFullPath
# 标准化路径,移除末尾的斜线 separator = PlatformIndicator.GetFileSeparator()
if self_path.endswith('/') or self_path.endswith('\\'): separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
self_path = self_path[:-1]
if other_path.endswith('/') or other_path.endswith('\\'):
other_path = other_path[:-1]
# 使用系统的路径规范化函数进行比较
return os.path.normpath(self_path) == os.path.normpath(other_path)
# 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True:
return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists():
return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else:
return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self): def ToPath(self):
return Path(self.OriginFullPath) return Path(self.OriginFullPath)
@@ -172,7 +175,7 @@ class ToolFile(BaseModel):
if self.Exists() is False: if self.Exists() is False:
raise FileNotFoundError("file not found") raise FileNotFoundError("file not found")
newpath = str(newpath) newpath = str(newpath)
if '\\' in newpath or '/' in newpath: if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath) newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath) new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path) os.rename(self.OriginFullPath, new_current_path)
@@ -183,16 +186,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r', encoding=encoding) as f: with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs) json_data = json.load(f, **kwargs)
return json_data return json_data
def LoadAsCsv(self) -> pd.DataFrame: def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame: def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f) return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame: def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame: def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f) return pd.read_excel(f)
def LoadAsBinary(self) -> bytes: def LoadAsBinary(self) -> bytes:
@@ -202,18 +221,103 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return f.read() return f.read()
def LoadAsWav(self): def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath) return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self): def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath) return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile: def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath) return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject: def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath) return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any: def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText() return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel: def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson()) return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data): def SaveAsJson(self, json_data):
try: try:
from pydantic import BaseModel from pydantic import BaseModel
@@ -225,16 +329,40 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w', encoding='utf-8') as f: with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4) json.dump(json_data, f, indent=4)
return self return self
def SaveAsCsv(self, csv_data:pd.DataFrame): def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath) csv_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsXml(self, xml_data:pd.DataFrame): def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath) xml_data.to_xml(self.OriginFullPath)
return self return self
def SaveAsDataframe(self, dataframe_data:pd.DataFrame): def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath) dataframe_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsExcel(self, excel_data:pd.DataFrame): def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False) excel_data.to_excel(self.OriginFullPath, index=False)
return self return self
def SaveAsBinary(self, binary_data:bytes): def SaveAsBinary(self, binary_data:bytes):
@@ -245,13 +373,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w') as f: with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data) f.writelines(text_data)
return self return self
def SaveAsAudio(self, audio_data:AudioSegment): def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath)) audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self return self
def SaveAsImage(self, image_data:ImageFile.ImageFile): def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath) image_data.save(self.OriginFullPath)
return self return self
def SaveAsDocx(self, docx_data:DocumentObject): def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath) docx_data.save(self.OriginFullPath)
return self return self
def SaveAsUnknown(self, unknown_data:Any): def SaveAsUnknown(self, unknown_data:Any):
@@ -267,6 +414,8 @@ class ToolFile(BaseModel):
return os.path.getsize(self.OriginFullPath) return os.path.getsize(self.OriginFullPath)
def GetExtension(self): def GetExtension(self):
return GetExtensionName(self.OriginFullPath) return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str: def GetFullPath(self) -> str:
return self.OriginFullPath return self.OriginFullPath
def GetFilename(self, is_without_extension = False): def GetFilename(self, is_without_extension = False):
@@ -276,7 +425,7 @@ class ToolFile(BaseModel):
''' '''
if is_without_extension and '.' in self.OriginFullPath: if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)] return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1]) return GetBaseFilename(self.OriginFullPath[:-1])
else: else:
return GetBaseFilename(self.OriginFullPath) return GetBaseFilename(self.OriginFullPath)
@@ -288,7 +437,7 @@ class ToolFile(BaseModel):
return os.path.dirname(self.OriginFullPath) return os.path.dirname(self.OriginFullPath)
def IsDir(self): def IsDir(self):
if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True return True
else: else:
return os.path.isdir(self.OriginFullPath) return os.path.isdir(self.OriginFullPath)
@@ -637,8 +786,11 @@ class ToolFile(BaseModel):
ignore_directories: 是否忽略目录事件 ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写 case_sensitive: 是否区分大小写
""" """
try:
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists(): if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}") raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
@@ -1000,31 +1152,3 @@ class ToolFile(BaseModel):
""" """
return self.get_permissions()['hidden'] return self.get_permissions()['hidden']
def split_elements(
file: Union[ToolFile, str],
*,
ratios: List[float] = [1,1],
pr: Optional[Callable[[ToolFile], bool]] = None,
shuffler: Optional[Callable[[List[ToolFile]], None]] = None,
output_dirs: Optional[List[ToolFile]] = None,
output_must_exist: bool = True,
output_callback: Optional[Callable[[ToolFile], None]] = None
) -> List[List[ToolFile]]:
result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(),
ratios=ratios,
pr=pr,
shuffler=shuffler)
if output_dirs is None:
return result
for i in range(min(len(output_dirs), len(result))):
output_dir: ToolFile = output_dirs[i]
if output_dir.IsDir() is False:
raise Exception("Outputs must be directory")
if output_must_exist:
output_dir.must_exists_as_new()
for file in result[i]:
current = output_dirs[i].MakeFileInside(file)
if output_callback:
output_callback(current)
return result

View File

@@ -53,6 +53,7 @@ class GlobalConfig:
# 检查配置文件,不存在则生成空配置 # 检查配置文件,不存在则生成空配置
self._data_pair: Dict[str, Any] = {} self._data_pair: Dict[str, Any] = {}
self._data_find: Dict[str, Any] = {}
self._const_config_file = ConstConfigFile self._const_config_file = ConstConfigFile
config_file = self.ConfigFile config_file = self.ConfigFile
@@ -160,7 +161,8 @@ class GlobalConfig:
"""保存配置到文件""" """保存配置到文件"""
config = self.ConfigFile config = self.ConfigFile
config.SaveAsJson({ config.SaveAsJson({
"properties": self._data_pair "properties": self._data_pair,
"find": self._data_find
}) })
return self return self
@@ -232,6 +234,7 @@ class GlobalConfig:
return self._data_pair[key] return self._data_pair[key]
else: else:
self.LogPropertyNotFound(key, default=default) self.LogPropertyNotFound(key, default=default)
self._data_find[key] = default
return default return default

View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

View File

@@ -241,7 +241,7 @@ def ToType(
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None) type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1] type_final = type_components[-1]
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\ PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}") f"typen: {typen}, type_components: {type_components}")
if type_module is not None: if type_module is not None:
return sys.modules[type_module].__dict__[type_final] return sys.modules[type_module].__dict__[type_final]
@@ -304,7 +304,7 @@ def DecayType(
return type_hint return type_hint
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}") PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None result: type|List[type] = None
@@ -333,7 +333,7 @@ def DecayType(
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>") raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}") PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool: def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
@@ -456,7 +456,7 @@ class ValueInfo(BaseInfo):
super().__init__(**kwargs) super().__init__(**kwargs)
self._RealType = metaType self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0: if GetInternalReflectionDebug() and len(generic_args) > 0:
print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}") f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args self._GenericArgs = generic_args
if not isinstance(metaType, type): if not isinstance(metaType, type):
@@ -546,7 +546,7 @@ class ValueInfo(BaseInfo):
**kwargs **kwargs
) -> 'ValueInfo': ) -> 'ValueInfo':
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\ PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}") f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type): if isinstance(metaType, type):
if metaType is list: if metaType is list:
@@ -565,7 +565,7 @@ class ValueInfo(BaseInfo):
return ValueInfo(metaType, **kwargs) return ValueInfo(metaType, **kwargs)
else: else:
return ValueInfo(type_, **kwargs) return ValueInfo(type_, **kwargs)
elif isinstance(metaType, Self):#metaType is Self: elif metaType is Self:
if SelfType is None: if SelfType is None:
raise ReflectionException("SelfType is required when metaType is <Self>") raise ReflectionException("SelfType is required when metaType is <Self>")
return ValueInfo.Create(SelfType, **kwargs) return ValueInfo.Create(SelfType, **kwargs)
@@ -601,7 +601,7 @@ class FieldInfo(MemberInfo):
selfType: type|Any|None = None selfType: type|Any|None = None
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ") f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__( super().__init__(
name = name, name = name,
@@ -611,7 +611,7 @@ class FieldInfo(MemberInfo):
) )
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType) self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}") f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property @property
@@ -746,7 +746,7 @@ class MethodInfo(MemberInfo):
is_class_method: bool, is_class_method: bool,
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})") f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public) MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType) self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
@@ -1143,12 +1143,12 @@ class RefType(ValueInfo):
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]: def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive: if currentType.IsPrimitive:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}" return f"{currentType.RealType}"
elif currentType.RealType in type_set: elif currentType.RealType in type_set:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return { return {
"type": f"{currentType.RealType}", "type": f"{currentType.RealType}",
@@ -1156,13 +1156,13 @@ class RefType(ValueInfo):
} }
else: else:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType) type_set.add(currentType.RealType)
value = {} value = {}
fields = currentType.GetFields() fields = currentType.GetFields()
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}") PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields: for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)) value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return { return {
@@ -1198,7 +1198,7 @@ class RefType(ValueInfo):
# 确保正确地实现所有GetBase*方法 # 确保正确地实现所有GetBase*方法
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]: def _GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1206,8 +1206,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetFields(flag)) result.extend(baseType.GetFields(flag))
return result return result
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
return self._GetBaseFields(flag)
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetAllBaseFields(self) -> List[FieldInfo]: def _GetAllBaseFields(self) -> List[FieldInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1215,9 +1218,12 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllFields()) result.extend(baseType.GetAllFields())
return result return result
def GetAllBaseFields(self) -> List[FieldInfo]:
return self._GetAllBaseFields()
# 修改所有的GetBase*方法 # 修改所有的GetBase*方法
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]: def _GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1225,8 +1231,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetMethods(flag)) result.extend(baseType.GetMethods(flag))
return result return result
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
return self._GetBaseMethods(flag)
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetAllBaseMethods(self) -> List[MethodInfo]: def _GetAllBaseMethods(self) -> List[MethodInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1234,8 +1243,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllMethods()) result.extend(baseType.GetAllMethods())
return result return result
def GetAllBaseMethods(self) -> List[MethodInfo]:
return self._GetAllBaseMethods()
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]: def _GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1243,8 +1255,11 @@ class RefType(ValueInfo):
result.extend(baseType.GetMembers(flag)) result.extend(baseType.GetMembers(flag))
return result return result
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
return self._GetBaseMembers(flag)
@functools.lru_cache(maxsize=128) @functools.lru_cache(maxsize=128)
def GetAllBaseMembers(self) -> List[MemberInfo]: def _GetAllBaseMembers(self) -> List[MemberInfo]:
if self._BaseTypes is None: if self._BaseTypes is None:
self._InitBaseTypesIfNeeded() self._InitBaseTypesIfNeeded()
result = [] result = []
@@ -1252,6 +1267,9 @@ class RefType(ValueInfo):
result.extend(baseType.GetAllMembers()) result.extend(baseType.GetAllMembers())
return result return result
def GetAllBaseMembers(self) -> List[MemberInfo]:
return self._GetAllBaseMembers()
def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]: def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
self._ensure_initialized() self._ensure_initialized()
if flag == RefTypeFlag.Default: if flag == RefTypeFlag.Default:
@@ -1411,7 +1429,7 @@ class TypeManager(BaseModel):
if data is None: if data is None:
raise ReflectionException("data is None") raise ReflectionException("data is None")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}") PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型 # 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache: if isinstance(data, str) and data in self._string_to_type_cache:
@@ -1436,7 +1454,7 @@ class TypeManager(BaseModel):
# 添加到弱引用缓存 # 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type) self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Get "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\ f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type return ref_type
@@ -1489,7 +1507,7 @@ class TypeManager(BaseModel):
try: try:
ref_type = RefType(metaType) ref_type = RefType(metaType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Create "\ PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\ f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type self._RefTypes[metaType] = ref_type

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,264 +0,0 @@
# Visual 模块
Visual模块提供了数据可视化和图像处理相关的功能包括数据图表、图像处理、词云等。
## 目录结构
- `Core.py`: 核心数据可视化功能
- `OpenCV.py`: OpenCV图像处理功能
- `WordCloud.py`: 词云生成功能
- `Manim.py`: 数学动画功能
## 功能特性
### 1. 数据可视化 (Core.py)
#### 1.1 基础图表
- 折线图
- 柱状图
- 散点图
- 直方图
- 饼图
- 箱线图
- 热力图
- 分类数据图
- 联合图
#### 1.2 数据处理
- 缺失值处理
- 重复值处理
- 数据标准化
- 数据归一化
### 2. 图像处理 (OpenCV.py)
#### 2.1 图像操作
- 图像加载
- 支持多种格式jpg, png, bmp等
- 支持从文件路径或URL加载
- 支持从内存缓冲区加载
- 图像保存
- 支持多种格式输出
- 支持质量参数设置
- 支持压缩选项
- 图像显示
- 支持窗口标题设置
- 支持窗口大小调整
- 支持键盘事件处理
- 图像转换
- RGB转灰度
- RGB转HSV
- RGB转LAB
- 支持自定义转换矩阵
- 图像缩放
- 支持多种插值方法
- 支持保持宽高比
- 支持指定目标尺寸
- 图像旋转
- 支持任意角度旋转
- 支持旋转中心点设置
- 支持旋转后尺寸调整
- 图像翻转
- 水平翻转
- 垂直翻转
- 对角线翻转
- 图像合并
- 支持多图像拼接
- 支持透明度混合
- 支持蒙版处理
#### 2.2 ImageObject类详解
ImageObject类提供了完整的图像处理功能
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 基本属性
width = image.width # 图像宽度
height = image.height # 图像高度
channels = image.channels # 通道数
dtype = image.dtype # 数据类型
# 图像处理
image.resize_image(800, 600) # 调整大小
image.convert_to_grayscale() # 转换为灰度图
image.filter_gaussian((5, 5), 1.5, 1.5) # 高斯滤波
image.rotate_image(45) # 旋转45度
image.flip_image(horizontal=True) # 水平翻转
# 图像增强
image.adjust_brightness(1.2) # 调整亮度
image.adjust_contrast(1.5) # 调整对比度
image.adjust_saturation(0.8) # 调整饱和度
image.equalize_histogram() # 直方图均衡化
# 边缘检测
image.detect_edges(threshold1=100, threshold2=200) # Canny边缘检测
image.detect_contours() # 轮廓检测
# 特征提取
keypoints = image.detect_keypoints() # 关键点检测
descriptors = image.compute_descriptors() # 描述子计算
# 图像保存
image.save_image("output.jpg", quality=95) # 保存图像
image.save_image("output.png", compression=9) # 保存PNG
# 图像显示
image.show_image("预览") # 显示图像
image.wait_key(0) # 等待按键
# 图像信息
print(image.get_info()) # 获取图像信息
print(image.get_histogram()) # 获取直方图
```
#### 2.3 图像增强
- 边缘检测
- 滤波处理
- 阈值处理
- 形态学操作
- 轮廓检测
- 特征匹配
#### 2.4 视频处理
- 视频读取
- 视频写入
- 摄像头控制
- 帧处理
### 3. 词云生成 (WordCloud.py)
#### 3.1 词云功能
- 词云创建
- 标题设置
- 渲染输出
- 样式定制
### 4. 数学动画 (Manim.py)
#### 4.1 动画功能
- 数学公式动画
- 几何图形动画
- 图表动画
- 场景管理
## 使用示例
### 1. 数据可视化示例
```python
from Convention.Visual import Core
# 创建数据可视化生成器
generator = Core.data_visual_generator("data.csv")
# 绘制折线图
generator.plot_line("x", "y", title="折线图示例")
# 绘制柱状图
generator.plot_bar("category", "value", title="柱状图示例")
# 绘制散点图
generator.plot_scatter("x", "y", title="散点图示例")
# 绘制饼图
generator.plot_pie("category", title="饼图示例")
```
### 2. 图像处理示例
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 图像处理
image.resize_image(800, 600)
image.convert_to_grayscale()
image.filter_gaussian((5, 5), 1.5, 1.5)
# 保存图像
image.save_image("output.jpg")
```
### 3. 词云生成示例
```python
from Convention.Visual import WordCloud
# 创建词云
wordcloud = WordCloud.make_word_cloud("词云", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
])
# 设置标题
WordCloud.set_title(wordcloud, "编程语言词云")
# 渲染输出
WordCloud.render_to(wordcloud, "wordcloud.html")
```
### 4. 视频处理示例
```python
from Convention.Visual import OpenCV
# 创建视频捕获对象
camera = OpenCV.light_cv_camera(0)
# 创建视频写入对象
writer = OpenCV.VideoWriterInstance(
"output.avi",
OpenCV.avi_with_Xvid_fourcc(),
30.0,
(640, 480)
)
# 录制视频
def stop_condition():
return OpenCV.is_current_key('q')
camera.recording(stop_condition, writer)
```
## 依赖项
- matplotlib: 数据可视化
- seaborn: 高级数据可视化
- opencv-python: 图像处理
- pyecharts: 词云生成
- manim: 数学动画
## 注意事项
1. 使用图像处理时注意内存占用
2. 视频处理时注意帧率设置
3. 词云生成时注意数据量
4. 动画制作时注意性能优化
## 性能优化
1. 使用图像处理时注意批量处理
2. 视频处理时使用合适的编码格式
3. 词云生成时控制词数
4. 动画制作时优化渲染设置
## 贡献指南
欢迎提交Issue和Pull Request来改进功能或添加新特性。

View File

@@ -1,66 +0,0 @@
from ..Internal import *
from pyecharts.charts import WordCloud
from pyecharts import options as opts
from pyecharts import types
#from ..File.Core import tool_file, UnWrapper as UnWrapper2Str
def make_word_cloud(
series_name: str,
data_pair: Sequence[Tuple[str, int]],
**kwargs,
):
wordcloud = WordCloud()
wordcloud.add(series_name, data_pair, **kwargs)
return wordcloud
def set_title(
wordcloud: WordCloud,
title: str
):
wordcloud.set_global_opts(
title_opts=opts.TitleOpts(title=title)
)
def render_to(
wordcloud: WordCloud,
file_name: Union[tool_file, str]
):
wordcloud.render(UnWrapper2Str(file_name))
class light_word_cloud(left_value_reference[WordCloud]):
def __init__(
self,
series_name: str,
data_pair: types.Sequence,
**kwargs,
):
super().__init__(make_word_cloud(series_name, data_pair, **kwargs))
def set_title(
self,
title: str
):
set_title(self.ref_value, title)
def render_to(
self,
file_name: Union[tool_file, str]
):
render_to(self.ref_value, file_name)
if __name__ == "__main__":
# 准备数据
wordcloud = make_word_cloud("", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
("Go", 60),
("Rust", 50),
("C#", 40),
("PHP", 30),
("Swift", 20),
("Kotlin", 10),
], word_size_range=[20, 100])
set_title(wordcloud, "cloud")
render_to(wordcloud, "wordcloud.html")

515
Convention/Runtime/Web.py Normal file
View File

@@ -0,0 +1,515 @@
from .Config import *
from .File import ToolFile
import json
import urllib.parse
import urllib.request
import urllib.error
import asyncio
import os
import re
from typing import *
from pydantic import BaseModel
try:
import aiohttp
import aiofiles
except ImportError as e:
ImportingThrow(e, "Web", ["aiohttp", "aiofiles"])
class WebError(Exception):
"""网络操作异常基类"""
pass
class URLValidationError(WebError):
"""URL验证异常"""
pass
class HTTPRequestError(WebError):
"""HTTP请求异常"""
pass
class DownloadError(WebError):
"""下载异常"""
pass
class ToolURL(BaseModel):
"""网络URL工具类提供HTTP客户端和URL操作功能"""
url: str
def __init__(self, url: Union[str, 'ToolURL']):
"""
从URL字符串创建对象
Args:
url: URL字符串或ToolURL对象
"""
if isinstance(url, ToolURL):
url = url.url
super().__init__(url=str(url))
def __str__(self) -> str:
"""隐式字符串转换"""
return self.url
def __bool__(self) -> bool:
"""隐式布尔转换等同于IsValid"""
return self.IsValid
def ToString(self) -> str:
"""获取完整URL"""
return self.url
def GetFullURL(self) -> str:
"""获取完整URL"""
return self.url
@property
def FullURL(self) -> str:
"""获取完整URL属性"""
return self.url
@property
def IsValid(self) -> bool:
"""检查URL是否有效"""
return self.ValidateURL()
def ValidateURL(self) -> bool:
"""
验证URL格式
Returns:
是否为有效的HTTP/HTTPS URL
"""
try:
parsed = urllib.parse.urlparse(self.url)
return parsed.scheme in ('http', 'https') and parsed.netloc != ''
except Exception:
return False
def GetFilename(self) -> str:
"""
获取URL中的文件名
Returns:
URL路径中的文件名
"""
try:
parsed = urllib.parse.urlparse(self.url)
path = parsed.path
if path:
return os.path.basename(path)
return ""
except Exception:
return ""
def GetExtension(self) -> str:
"""
获取文件扩展名
Returns:
文件扩展名(不包含点)
"""
filename = self.GetFilename()
if '.' in filename:
return filename.split('.')[-1].lower()
return ""
def ExtensionIs(self, *extensions: str) -> bool:
"""
检查扩展名是否匹配
Args:
*extensions: 要检查的扩展名列表
Returns:
是否匹配任一扩展名
"""
current_ext = self.GetExtension()
return current_ext in [ext.lower().lstrip('.') for ext in extensions]
def Open(self, url: str) -> 'ToolURL':
"""
在当前对象上打开新URL
Args:
url: 新的URL字符串
Returns:
更新后的ToolURL对象
"""
self.url = str(url)
return self
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件txt, html, htm, css, js, xml, csv"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件jpg, jpeg, png, gif, bmp, svg"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件pdf, doc, docx, xls, xlsx, ppt, pptx"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
# HTTP请求方法
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
with urllib.request.urlopen(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
data = None
if form_data:
data = urllib.parse.urlencode(form_data).encode('utf-8')
req = urllib.request.Request(self.url, data=data, method='POST')
if form_data:
req.add_header('Content-Type', 'application/x-www-form-urlencoded')
with urllib.request.urlopen(req) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 异步HTTP请求方法
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.url, data=form_data) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 内容加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
content = response.read()
# 尝试检测编码
encoding = response.headers.get_content_charset() or 'utf-8'
return content.decode(encoding)
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.text()
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
return response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = self.LoadAsText()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
# 文件保存和下载功能
def Save(self, local_path: Optional[str] = None) -> ToolFile:
"""
自动选择格式保存到本地
Args:
local_path: 本地保存路径如果为None则自动生成
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
if self.IsText:
return self.SaveAsText(local_path)
elif self.IsJson:
return self.SaveAsJson(local_path)
else:
return self.SaveAsBinary(local_path)
def SaveAsText(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为文本文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
text_content = self.LoadAsText()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(text_content)
return file_obj
def SaveAsJson(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为JSON文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
json_data = self.LoadAsJson()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(json_data)
return file_obj
def SaveAsBinary(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为二进制文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
binary_content = self.LoadAsBinary()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(binary_content)
return file_obj
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
同步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
return self.Save(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
file_obj.SaveAsText(content)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
file_obj.SaveAsJson(content)
else:
content = await self.LoadAsBinaryAsync()
file_obj.SaveAsBinary(content)
return file_obj
except Exception as e:
raise DownloadError(f"Failed to download {self.url}: {str(e)}")
# 静态HTTP客户端实例避免连接池耗尽
_http_session: Optional[aiohttp.ClientSession] = None
async def get_http_session() -> aiohttp.ClientSession:
"""获取全局HTTP会话实例"""
global _http_session
if _http_session is None or _http_session.closed:
_http_session = aiohttp.ClientSession()
return _http_session
async def close_http_session():
"""关闭全局HTTP会话"""
global _http_session
if _http_session and not _http_session.closed:
await _http_session.close()
_http_session = None

254
README.md
View File

@@ -0,0 +1,254 @@
# Convention-Python
Convention-Python基于 Convention-Template 规范实现的一套完整的开发工具集。
## 主要内容
### 辅助 (Config.py)
- **内置依赖**: 提供辅助函数与辅助类型
### 架构 (Architecture.py)
- **依赖注入容器**: 支持类型注册、依赖解析和生命周期管理
- **信号系统**: 提供发布-订阅模式的消息通信机制
- **时间线管理**: 支持基于条件的任务队列和执行流程控制
- **单例模式**: 内置单例模型支持
### 异步 (Asynchrony.py)
- **线程管理**: 提供线程实例、原子操作、锁机制
- **并发控制**: 支持线程安全的数据结构和操作
- **异步工具**: 简化异步编程的工具函数
### 配置 (GlobalConfig.py)
- **类型系统**: 强大的类型检查和转换系统
- **调试支持**: 内置调试模式和彩色输出
- **平台兼容**: 跨平台路径和环境管理
- **全局配置**: 统一的配置管理机制
### 文件 (File.py)
- **ToolFile 类**: 强大的文件操作封装
- 支持多种文件格式 (JSON, CSV, Excel, 图像, 音频, Word文档等)
- 文件压缩和解压缩 (ZIP, TAR)
- 文件加密和解密
- 哈希值计算和验证
- 文件监控和备份
- 权限管理
- **批量处理**: 支持文件批量操作和处理
### 序列化 (EasySave.py)
- **序列化支持**: JSON 和二进制格式的序列化
- **反射集成**: 基于反射的对象序列化和反序列化
- **备份机制**: 自动备份和恢复功能
- **字段过滤**: 支持自定义字段选择和忽略规则
### 反射 (Reflection.py)
- **类型管理**: 完整的类型信息管理和缓存
- **成员访问**: 字段和方法的动态访问
- **类型转换**: 灵活的类型转换和验证
- **泛型支持**: 支持泛型类型的处理
### 视觉 (Visual)
#### 可视化 (Visual/Core.py)
- **图表生成**: 支持多种图表类型 (折线图、柱状图、散点图、饼图等)
- **数据处理**: 数据清洗、标准化、归一化
- **样式定制**: 丰富的图表样式和主题选项
#### 图像处理 (Visual/OpenCV.py)
- **ImageObject 类**: 完整的图像处理功能
- **图像增强**: 支持 30+ 种图像增强算法
- **格式转换**: 支持多种图像格式转换
- **批量处理**: 支持图像批量处理和增强
#### 词云生成 (Visual/WordCloud.py)
- **词云创建**: 支持中英文词云生成
- **样式定制**: 丰富的样式和布局选项
### 字符串工具 (String.py)
- **字符串处理**: 长度限制、填充、编码转换
- **中文分词**: 集成 jieba 分词支持
## 安装说明
### 环境要求
- Python >= 3.12
- 操作系统: Windows, Linux, macOS
### 依赖包
运行时自动报告需要被引入的包
调用Config中ReleaseFailed2Requirements函数生成requirements.txt文件
### 安装方式
1. **从源码安装**:
```bash
git clone https://github.com/NINEMINEsigma/Convention-Python.git
cd Convention-Python
pip install -e .
```
2. **直接安装**:
```bash
pip install .
```
3. **打包安装**
```bash
pip install build
python -m build
pip install dist/convention.tar.gz
```
## 🚀 使用示例
### 架构模式示例
```python
from Convention.Runtime import Architecture
# 注册服务
class DatabaseService:
def query(self, sql): return "result"
db_service = DatabaseService()
Architecture.Register(DatabaseService, db_service, lambda: print("DB服务初始化"))
# 获取服务
service = Architecture.Get(DatabaseService)
result = service.query("SELECT * FROM users")
```
### 文件操作示例
```python
from Convention.Runtime import ToolFile
# 创建文件对象
file = ToolFile("data.json")
# 保存和加载 JSON 数据
data = {"name": "张三", "age": 25}
file.SaveAsJson(data)
loaded_data = file.LoadAsJson()
# 文件压缩
compressed = file.Compress("backup.zip")
# 计算哈希值
hash_value = file.calculate_hash("sha256")
```
### 数据序列化示例
```python
from Convention.Runtime import EasySave
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
email: str
# 保存数据
user = User(name="李四", age=30, email="lisi@example.com")
EasySave.Write(user, "user.json")
# 读取数据
loaded_user = EasySave.Read(User, "user.json")
```
### 数据可视化示例
```python
from Convention.Runtime.Visual import Core
import pandas as pd
# 创建数据可视化生成器
df = pd.read_csv("sales_data.csv")
generator = Core.data_visual_generator("sales_data.csv")
# 绘制图表
generator.plot_line("month", "sales", title="月度销售趋势")
generator.plot_bar("product", "revenue", title="产品收入对比")
generator.plot_pie("category", title="类别分布")
```
### 图像处理示例
```python
from Convention.Runtime.Visual.OpenCV import ImageObject
from Convention.Runtime.Visual.Core import ImageAugmentConfig, ResizeAugmentConfig
# 加载图像
image = ImageObject("input.jpg")
# 图像增强配置
config = ImageAugmentConfig(
resize=ResizeAugmentConfig(width=800, height=600),
lighting=LightingAugmentConfig(lighting=20),
contrast=ContrastAugmentConfig(contrast=1.2)
)
# 批量增强
results = config.augment_from_dir_to("input_dir", "output_dir")
```
## 打包指令
### 构建分发包
```bash
# 清理之前的构建文件
python setup.py clean --all
rm -rf build/ dist/ *.egg-info/
# 构建源码包和轮子包
python setup.py sdist bdist_wheel
# 或使用 build 工具 (推荐)
pip install build
python -m build
```
### 安装本地包
```bash
# 开发模式安装 (可编辑安装)
pip install -e .
# 普通安装
pip install .
```
### 上传到 PyPI
```bash
# 安装上传工具
pip install twine
# 检查包
twine check dist/*
# 上传到测试 PyPI
twine upload --repository-url https://test.pypi.org/legacy/ dist/*
# 上传到正式 PyPI
twine upload dist/*
```
### 创建可执行文件
```bash
# 使用 PyInstaller
pip install pyinstaller
pyinstaller --onefile --name convention-tool your_main_script.py
```
## 许可证
本项目采用 MIT 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情。
## 作者
**LiuBai** - [NINEMINEsigma](https://github.com/NINEMINEsigma)
## 相关链接
- [Convention-Template](https://github.com/NINEMINEsigma/Convention-Template) - 项目模板规范
- [GitHub Issues](https://github.com/NINEMINEsigma/Convention-Python/issues) - 问题反馈
- [GitHub Releases](https://github.com/NINEMINEsigma/Convention-Python/releases) - 版本发布
*最后更新: 2025年9月*

View File

@@ -1,20 +0,0 @@
{
"easy": {
"__type": "__main__.test_log, Global",
"value": {
"__type": "__main__.test_log, Global",
"model_computed_fields": {
"__type": "typing.Any, Global"
},
"model_extra": null,
"model_fields": {
"__type": "typing.Any, Global"
},
"model_fields_set": {
"__type": "typing.Any, Global"
},
"test_field": 1,
"test_field_2": "test"
}
}
}

View File

@@ -1,19 +1,10 @@
import sys import sys
import os import os
from time import sleep
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.Config import * from Convention.Runtime.File import *
from Convention.Runtime.EasySave import *
class Test: file = ToolFile("[Test]")|"temp"|None
test_field:int = 10 print(file.MustExistsPath())
class_test_field:int = 20
def __init__(self):
self.test_field:int = 0
def run():
print(Test.__annotations__)
if __name__ == "__main__":
run()

View File

@@ -1,3 +0,0 @@
import math
import r
print(re.findall(r"\d+[.\d]?", "xxxxx$19.99"))