Files
Convention-Python/Convention/Runtime/Reflection.py

1530 lines
58 KiB
Python
Raw Normal View History

2025-07-09 17:39:50 +08:00
import importlib
import inspect
import types
import weakref
from enum import Enum, IntFlag
from typing import *
import typing
from .Config import *
from pydantic import BaseModel, Field, PrivateAttr
import json
import functools
import concurrent.futures
from typing import Set
type_symbols = {
'int' : int,
'float' : float,
'str' : str,
'list' : list,
'dict' : dict,
'tuple' : tuple,
'set' : set,
'bool' : bool,
'NoneType' : type(None),
}
_Internal_Reflection_Debug:bool = False
def GetInternalReflectionDebug() -> bool:
return _Internal_Reflection_Debug and GetInternalDebug()
def SetInternalReflectionDebug(debug:bool) -> None:
global _Internal_Reflection_Debug
_Internal_Reflection_Debug = debug
# 缓存get_type_from_string的结果
_type_string_cache: Dict[str, type] = {}
class ReflectionException(Exception):
def __init__(self, message:str):
self.message = f"{ConsoleFrontColor.RED}{message}{ConsoleFrontColor.RESET}"
super().__init__(self.message)
2025-07-09 23:52:24 +08:00
def String2Type(type_string:str) -> type:
2025-07-09 17:39:50 +08:00
"""
根据字符串生成类型使用缓存提高性能
"""
# 检查缓存
if type_string in _type_string_cache:
return _type_string_cache[type_string]
result = None
# 检查内置类型映射
if type_string in type_symbols:
result = type_symbols[type_string]
# 从内置类型中获取
elif type_string in dir(types):
result = getattr(types, type_string)
# 从当前全局命名空间获取
elif type_string in globals():
result = globals().get(type_string)
# 从当前模块获取
elif type_string in dir(__import__(__name__)):
result = getattr(__import__(__name__), type_string)
# 尝试从其他模块获取
else:
try:
if '.' not in type_string:
raise ValueError(f"Empty module name, type_string is {type_string}")
module_name, _, class_name = type_string.rpartition('.')
if not module_name:
raise ValueError(f"Empty module name, type_string is {type_string}")
# 首先尝试直接获取模块
try:
module = sys.modules[module_name]
except KeyError:
# 模块未加载,需要导入
module = importlib.import_module(module_name)
result = getattr(module, class_name)
except (ImportError, AttributeError, ValueError) as ex:
raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>") from ex
# 更新缓存
if result is not None:
_type_string_cache[type_string] = result
2025-07-09 23:52:24 +08:00
return result
raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>")
2025-07-09 17:39:50 +08:00
2025-07-10 15:08:20 +08:00
@functools.lru_cache(maxsize=1024)
2025-07-09 23:52:24 +08:00
def StringWithModel2Type(type_string:str, module_name:str) -> type|None:
2025-07-09 17:39:50 +08:00
'''
根据字符串生成类型带模块名参数使用缓存
'''
# 检查内置类型映射
if type_string in type_symbols:
return type_symbols[type_string]
# 尝试从指定模块获取
try:
module = sys.modules.get(module_name)
if module and type_string in dir(module):
return getattr(module, type_string)
except (KeyError, AttributeError):
pass
# 尝试从类型模块获取
if type_string in dir(types):
return getattr(types, type_string)
return None
# 获取泛型参数
2025-07-09 23:52:24 +08:00
def GetGenericArgs(type_hint: type | Any) -> tuple[type | None, tuple[type, ...] | None]:
2025-07-09 17:39:50 +08:00
origin = get_origin(type_hint) # 获取原始类型
args = get_args(type_hint) # 获取泛型参数
if origin is None:
return None, None
return origin, args
2025-07-09 23:52:24 +08:00
def IsGeneric(type_hint: type | Any) -> bool:
2025-07-09 17:39:50 +08:00
return "__origin__" in dir(type_hint)
class _SpecialIndictaor:
pass
class ListIndictaor(_SpecialIndictaor):
elementType:type
def __init__(self, elementType:type):
self.elementType = elementType
def __repr__(self) -> str:
return f"ListIndictaor<elementType={self.elementType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(List[self.elementType])
class DictIndictaor(_SpecialIndictaor):
keyType:type
valueType:type
def __init__(self, keyType:type, valueType:type):
self.keyType = keyType
self.valueType = valueType
def __repr__(self) -> str:
return f"DictIndictaor<keyType={self.keyType}, valueType={self.valueType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Dict[self.keyType, self.valueType])
class TupleIndictaor(_SpecialIndictaor):
elementTypes:Tuple[type, ...]
def __init__(self, *elementTypes:type):
self.elementTypes = elementTypes
def __repr__(self) -> str:
return f"TupleIndictaor<{', '.join(map(str, self.elementTypes))}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Tuple[self.elementTypes])
class SetIndictaor(_SpecialIndictaor):
elementType:type
def __init__(self, elementType:type):
self.elementType = elementType
def __repr__(self) -> str:
return f"SetIndictaor<elementType={self.elementType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Set[self.elementType])
# 添加记忆化装饰器
def memoize(func):
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
return wrapper
# 优化to_type函数
@memoize
2025-07-09 23:52:24 +08:00
def ToType(
2025-07-09 17:39:50 +08:00
typen: type|Any|str,
*,
module_name: str|None=None
) -> type|List[type]|_SpecialIndictaor:
# 快速路径:如果已经是类型,直接返回
if isinstance(typen, type):
return typen
elif isinstance(typen, _SpecialIndictaor):
return typen
elif isinstance(typen, str):
# 快速路径:检查字符串的格式
if '.' in typen:
# 可能是带模块名的类型
try:
module_parts = typen.split('.')
# 尝试从sys.modules中获取模块
current_module = sys.modules
for part in module_parts[:-1]:
if part in current_module:
current_module = current_module[part]
else:
# 需要导入模块
current_module = importlib.import_module('.'.join(module_parts[:-1]))
break
# 获取类型
if isinstance(current_module, dict):
# 从字典中获取
result = current_module.get(module_parts[-1])
if isinstance(result, type):
return result
else:
# 从模块对象中获取
result = getattr(current_module, module_parts[-1], None)
if isinstance(result, type):
return result
except (ImportError, AttributeError):
pass
# 回退到一般处理
import sys
if not all(c.isalnum() or c == '.' for c in typen):
raise ValueError(f"Invalid type string: {typen}, only alphanumeric characters and dots are allowed")
type_components = typen.split(".")
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1]
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
2025-07-09 17:39:50 +08:00
f"typen: {typen}, type_components: {type_components}")
if type_module is not None:
return sys.modules[type_module].__dict__[type_final]
else:
for module in sys.modules.values():
if type_final in module.__dict__:
return module.__dict__[type_final]
2025-07-09 23:52:24 +08:00
return String2Type(typen)
elif IsUnion(typen):
uTypes = GetUnionTypes(typen)
2025-07-09 17:39:50 +08:00
uTypes = [uType for uType in uTypes if uType is not type(None)]
if len(uTypes) == 1:
return uTypes[0]
elif len(uTypes) == 0:
return type(None)
else:
return uTypes
elif hasattr(typen, '__origin__'):
oType = get_origin(typen)
if oType is list:
return ListIndictaor(get_args(typen)[0])
elif oType is dict:
return DictIndictaor(get_args(typen)[0], get_args(typen)[1])
elif oType is tuple:
return TupleIndictaor(*get_args(typen))
elif oType is set:
return SetIndictaor(get_args(typen)[0])
else:
return oType
else:
return type(typen)
2025-07-09 23:52:24 +08:00
def TryToType(typen:type|Any|str, *, module_name:str|None=None) -> type|List[type]|_SpecialIndictaor|None:
2025-07-09 17:39:50 +08:00
try:
2025-07-09 23:52:24 +08:00
return ToType(typen, module_name=module_name)
2025-07-09 17:39:50 +08:00
except Exception:
return None
2025-07-09 23:52:24 +08:00
def IsUnion(type_hint: type | Any) -> bool:
2025-07-09 17:39:50 +08:00
return "__origin__" in dir(type_hint) and type_hint.__origin__ == Union
2025-07-09 23:52:24 +08:00
def GetUnionTypes(type_hint: type | Any) -> List[type]:
2025-07-09 17:39:50 +08:00
return [t for t in type_hint.__args__]
class TypeVarIndictaor:
pass
class AnyVarIndicator:
pass
# 优化decay_type函数
@memoize
2025-07-09 23:52:24 +08:00
def DecayType(
2025-07-09 17:39:50 +08:00
type_hint: type|Any,
*,
module_name: str|None=None
) -> type|List[type]|_SpecialIndictaor:
# 快速路径:直接判断常见类型
if isinstance(type_hint, (type, _SpecialIndictaor)):
return type_hint
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
2025-07-09 23:52:24 +08:00
result: type|List[type] = None
2025-07-09 17:39:50 +08:00
# 处理字符串类型
if isinstance(type_hint, str):
try:
2025-07-09 23:52:24 +08:00
result = ToType(type_hint, module_name=module_name)
2025-07-09 17:39:50 +08:00
except TypeError:
result = Any
# 处理forward reference
elif hasattr(type_hint, "__forward_arg__"):
2025-07-09 23:52:24 +08:00
result = ToType(type_hint.__forward_arg__, module_name=module_name)
2025-07-09 17:39:50 +08:00
# 处理type类型
elif type_hint is type:
result = type_hint
# 处理union类型
2025-07-09 23:52:24 +08:00
elif IsUnion(type_hint):
result = GetUnionTypes(type_hint)
2025-07-09 17:39:50 +08:00
# 处理TypeVar
elif isinstance(type_hint, TypeVar):
result = TypeVarIndictaor
# 处理泛型类型
2025-07-09 23:52:24 +08:00
elif IsGeneric(type_hint):
2025-07-09 17:39:50 +08:00
result = get_origin(type_hint)
else:
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
2025-07-09 17:39:50 +08:00
return result
2025-07-09 23:52:24 +08:00
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
2025-07-09 17:39:50 +08:00
'''
检查成员是否只在当前类中定义而不是在父类中定义
'''
# 获取当前类的所有成员
current_members = dict(inspect.getmembers(current_class))
if member_name not in current_members:
return False
# 获取父类的所有成员
for baseType in current_class.__bases__:
parent_members = dict(inspect.getmembers(baseType))
if member_name in parent_members:
return False
return True
2025-07-09 23:52:24 +08:00
class BaseInfo(BaseModel):
def SymbolName(self) -> str:
return "BaseInfo"
def ToString(self) -> str:
return self.SymbolName()
2025-07-09 17:39:50 +08:00
class MemberInfo(BaseInfo):
_MemberName: str = PrivateAttr(default="")
_ParentType: Optional[type] = PrivateAttr(default=None)
_IsStatic: bool = PrivateAttr(default=False)
_IsPublic: bool = PrivateAttr(default=False)
def __init__(self, name:str, ctype:Optional[type], is_static:bool, is_public:bool, **kwargs):
super().__init__(**kwargs)
self._MemberName = name
self._ParentType = ctype
self._IsStatic = is_static
self._IsPublic = is_public
@property
def MemberName(self) -> str:
return self._MemberName
@property
2025-07-09 23:52:24 +08:00
def ParentType(self) -> Optional[type]:
2025-07-09 17:39:50 +08:00
return self._ParentType
@property
def IsStatic(self) -> bool:
return self._IsStatic
@property
def IsPublic(self) -> bool:
return self._IsPublic
@override
def __repr__(self) -> str:
return f"<{self.MemberName}>"
@override
def __str__(self) -> str:
return f"{self.MemberName}"
@override
def SymbolName(self) -> str:
return "MemberInfo"
@override
def ToString(self) -> str:
return f"MemberInfo<name={self.MemberName}, ctype={self.ParentType}, " \
f"{'static' if self.IsStatic else 'instance'}, {'public' if self.IsPublic else 'private'}>"
class ValueInfo(BaseInfo):
_RealType: Optional[Any] = PrivateAttr(default=None)
_IsPrimitive: bool = PrivateAttr(default=False)
_IsValueType: bool = PrivateAttr(default=False)
_IsCollection: bool = PrivateAttr(default=False)
_IsDictionary: bool = PrivateAttr(default=False)
_IsTuple: bool = PrivateAttr(default=False)
_IsSet: bool = PrivateAttr(default=False)
_IsList: bool = PrivateAttr(default=False)
_IsUnsupported: bool = PrivateAttr(default=False)
_GenericArgs: List[type] = PrivateAttr(default=[])
@property
def IsUnion(self) -> bool:
2025-07-09 23:52:24 +08:00
return IsUnion(self._RealType)
2025-07-09 17:39:50 +08:00
@property
2025-07-09 23:52:24 +08:00
def RealType(self) -> type|Any:
2025-07-09 17:39:50 +08:00
return self._RealType
@property
def IsCollection(self) -> bool:
return self._IsCollection
@property
def IsPrimitive(self) -> bool:
return self._IsPrimitive
@property
def IsValueType(self) -> bool:
return self._IsValueType
@property
def IsDictionary(self) -> bool:
return self._IsDictionary
@property
def IsTuple(self) -> bool:
return self._IsTuple
@property
def IsSet(self) -> bool:
return self._IsSet
@property
def IsList(self) -> bool:
return self._IsList
@property
def IsUnsupported(self) -> bool:
return self._IsUnsupported
@property
def GenericArgs(self) -> List[type]:
return self._GenericArgs
@property
def IsGeneric(self) -> bool:
return len(self._GenericArgs) > 0
@property
def Module(self) -> Optional[Dict[str, type]]:
return sys.modules[self.RealType.__module__]
@property
def ModuleName(self) -> str:
return self.RealType.__module__
def __init__(self, metaType:type|Any, generic_args:List[type]=[], **kwargs) -> None:
super().__init__(**kwargs)
self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0:
PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
2025-07-09 17:39:50 +08:00
f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args
if not isinstance(metaType, type):
return
self._IsPrimitive = (
issubclass(metaType, int) or
issubclass(metaType, float) or
issubclass(metaType, str) or
issubclass(metaType, bool) or
issubclass(metaType, complex) #or
# issubclass(metaType, tuple) or
# issubclass(metaType, set) or
# issubclass(metaType, list) or
# issubclass(metaType, dict)
)
self._IsValueType = (
issubclass(metaType, int) or
issubclass(metaType, float) or
issubclass(metaType, str) or
issubclass(metaType, bool) or
issubclass(metaType, complex)
)
self._IsCollection = (
issubclass(metaType, list) or
issubclass(metaType, dict) or
issubclass(metaType, tuple) or
issubclass(metaType, set)
)
self._IsDictionary = (
issubclass(metaType, dict)
)
self._IsTuple = (
issubclass(metaType, tuple)
)
self._IsSet = (
issubclass(metaType, set)
)
self._IsList = (
issubclass(metaType, list)
)
def Verify(self, valueType:type) -> bool:
if self.IsUnsupported:
raise ReflectionException(f"Unsupported type: {self.RealType}")
if valueType is type(None):
return True
if self.IsUnion:
2025-07-09 23:52:24 +08:00
return any(ValueInfo(uType).Verify(valueType) for uType in GetUnionTypes(self.RealType))
2025-07-09 17:39:50 +08:00
elif self.RealType is Any:
return True
elif self.RealType is type(None):
return valueType is None or valueType is type(None)
else:
try:
return issubclass(valueType, self.RealType)
except Exception as e:
raise ReflectionException(f"Verify type {valueType} with {self.RealType}: \n{e}") from e
def DecayToList(self) -> List[Self]:
result:List[Self] = []
if self.IsUnion:
2025-07-09 23:52:24 +08:00
for uType in GetUnionTypes(self.RealType):
2025-07-09 17:39:50 +08:00
result.extend(ValueInfo(uType).DecayToList())
else:
result.append(self)
result = list(dict.fromkeys(result).keys())
return result
@override
def __repr__(self) -> str:
generic_args = ", ".join(self._GenericArgs)
return f"ValueInfo<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>"
@override
def SymbolName(self) -> str:
return "ValueInfo"
@override
def ToString(self) -> str:
generic_args = ", ".join(self._GenericArgs)
return f"<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>"
@staticmethod
def Create(
metaType: type|Any,
*,
module_name: Optional[str] = None,
SelfType: type|Any|None = None,
**kwargs
2025-07-09 23:52:24 +08:00
) -> 'ValueInfo':
2025-07-09 17:39:50 +08:00
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
2025-07-09 17:39:50 +08:00
f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type):
if metaType is list:
return ValueInfo(list, [Any])
elif metaType is dict:
return ValueInfo(dict, [Any, Any])
elif metaType is tuple:
return ValueInfo(tuple, [])
elif metaType is set:
return ValueInfo(set, [Any])
else:
return ValueInfo(metaType, **kwargs)
elif isinstance(metaType, str):
2025-07-09 23:52:24 +08:00
type_ = TryToType(metaType, module_name=module_name)
2025-07-09 17:39:50 +08:00
if type_ is None:
return ValueInfo(metaType, **kwargs)
else:
return ValueInfo(type_, **kwargs)
2025-07-24 11:45:44 +08:00
elif metaType is Self:
2025-07-09 17:39:50 +08:00
if SelfType is None:
raise ReflectionException("SelfType is required when metaType is <Self>")
return ValueInfo.Create(SelfType, **kwargs)
elif isinstance(metaType, TypeVar):
2025-07-09 23:52:24 +08:00
gargs = GetGenericArgs(metaType)
2025-07-09 17:39:50 +08:00
if len(gargs) == 1:
return ValueInfo(gargs[0], **kwargs)
else:
return ValueInfo(Any, **kwargs)
elif hasattr(metaType, '__origin__'):
oType = get_origin(metaType)
if oType is list:
return ValueInfo(list, [get_args(metaType)[0]])
elif oType is dict:
return ValueInfo(dict, [get_args(metaType)[0], get_args(metaType)[1]])
elif oType is tuple:
2025-07-09 23:52:24 +08:00
return ValueInfo(tuple, list(get_args(metaType)))
2025-07-09 17:39:50 +08:00
elif oType is set:
return ValueInfo(set, [get_args(metaType)[0]])
return ValueInfo(metaType, **kwargs)
class FieldInfo(MemberInfo):
_MetaType: Optional[ValueInfo] = PrivateAttr(default=None)
def __init__(
self,
metaType: Any,
name: str,
ctype: type,
is_static: bool,
is_public: bool,
module_name: Optional[str] = None,
selfType: type|Any|None = None
):
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
2025-07-09 17:39:50 +08:00
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__(
name = name,
ctype = ctype,
is_static = is_static,
is_public = is_public,
)
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
2025-07-09 17:39:50 +08:00
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property
def IsUnion(self) -> bool:
2025-07-09 23:52:24 +08:00
return self.ValueType.IsUnion
2025-07-09 17:39:50 +08:00
@property
def FieldName(self) -> str:
'''
字段名称
'''
return self.MemberName
@property
2025-07-09 23:52:24 +08:00
def ValueType(self) -> ValueInfo:
2025-07-09 17:39:50 +08:00
return self._MetaType
@property
def FieldType(self):
'''
字段类型
'''
2025-07-09 23:52:24 +08:00
return self.ValueType.RealType
2025-07-09 17:39:50 +08:00
def Verify(self, valueType:type) -> bool:
2025-07-09 23:52:24 +08:00
return self.ValueType.Verify(valueType)
2025-07-09 17:39:50 +08:00
def GetValue(self, obj:Any) -> Any:
if self.IsStatic:
return getattr(self.ParentType, self.MemberName)
else:
if not isinstance(obj, self.ParentType):
raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\
f"{ConsoleFrontColor.RED} , parent type mismatch, expected {self.ParentType}, got {type(obj)}"\
f"{ConsoleFrontColor.RESET}")
return getattr(obj, self.MemberName)
2025-07-09 23:52:24 +08:00
2025-07-09 17:39:50 +08:00
def SetValue(self, obj:Any, value:Any) -> None:
if self.IsStatic:
if self.Verify(type(value)):
setattr(self.ParentType, self.MemberName, value)
else:
raise TypeError(f"Value type mismatch, expected {self.MetaType.RealType}, got {type(value)}")
else:
if not isinstance(obj, self.ParentType):
raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}")
if self.Verify(type(value)):
setattr(obj, self.MemberName, value)
else:
raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\
f"{ConsoleFrontColor.RED} , value type mismatch, expected \"{self.FieldType}\""\
f", got {type(value)}{ConsoleFrontColor.RESET}")
@override
def __repr__(self) -> str:
return f"<{self.MemberName} type={self.FieldType}>"
@override
def SymbolName(self) -> str:
return "FieldInfo"
@override
def ToString(self) -> str:
return f"FieldInfo<name={self.MemberName}, type={self.FieldType}, ctype={self.ParentType}, "\
f"{("generics="+str(self.ValueType.GenericArgs)+", ") if self.ValueType.IsGeneric else ''}" \
f"{'static' if self.IsStatic else 'instance'}, {'public' if self.IsPublic else 'private'}>"
class ParameterInfo(BaseInfo):
_MetaType: Optional[ValueInfo] = PrivateAttr(default=None)
_ParameterName: str = PrivateAttr(default="")
_IsOptional: bool = PrivateAttr(default=False)
_DefaultValue: Any = PrivateAttr(default=None)
def __init__(
self,
metaType: Any,
name: str,
is_optional: bool,
default_value: Any,
module_name: Optional[str] = None,
selfType: type|Any|None = None,
**kwargs
):
super().__init__(**kwargs)
self._ParameterName = name
self._IsOptional = is_optional
self._DefaultValue = default_value
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
@property
def ValueType(self):
return self._MetaType
@property
def ParameterName(self) -> str:
return self._ParameterName
@property
def ParameterType(self):
return self._MetaType.RealType
@property
def IsOptional(self) -> bool:
return self._IsOptional
@property
def DefaultValue(self) -> Any:
return self._DefaultValue
def Verify(self, valueType:type) -> bool:
return self._MetaType.Verify(valueType)
@override
def __repr__(self) -> str:
return f"<{self.ParameterName}>"
@override
def SymbolName(self) -> str:
return "ParameterInfo"
@override
def ToString(self) -> str:
return f"ParameterInfo<name={self.ParameterName}, type={self.ParameterType}, " \
f"{'optional' if self.IsOptional else 'required'}, default={self.DefaultValue}>"
class MethodInfo(MemberInfo):
_ReturnType: Optional[ValueInfo] = PrivateAttr(default=None)
_Parameters: List[ParameterInfo] = PrivateAttr(default=[])
_PositionalParameters: List[ParameterInfo] = PrivateAttr(default=[])
_KeywordParameters: List[ParameterInfo] = PrivateAttr(default=[])
_IsClassMethod: bool = PrivateAttr(default=False)
def __init__(
self,
return_type: Any,
parameters: List[ParameterInfo],
positional_parameters: List[ParameterInfo],
keyword_parameters: List[ParameterInfo],
name: str,
ctype: type,
is_static: bool,
is_public: bool,
is_class_method: bool,
):
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
2025-07-09 17:39:50 +08:00
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
self._Parameters = parameters
self._PositionalParameters = positional_parameters
self._KeywordParameters = keyword_parameters
self._IsClassMethod = is_class_method
@property
def ReturnType(self) -> ValueInfo:
return self._ReturnType.RealType
@property
def Parameters(self) -> List[ParameterInfo]:
return self._Parameters
@property
def PositionalParameters(self) -> List[ParameterInfo]:
return self._PositionalParameters
@property
def KeywordParameters(self) -> List[ParameterInfo]:
return self._KeywordParameters
@property
def IsClassMethod(self) -> bool:
return self._IsClassMethod
@overload
def Invoke(self, obj:object, *args, **kwargs) -> object:
'''
调用实例方法
'''
...
@overload
def Invoke(self, obj:type, *args, **kwargs) -> object:
'''
调用类方法
'''
...
@overload
def Invoke(self, noneObj:Literal[None]|None, *args, **kwargs) -> object:
'''
调用静态方法
'''
...
def Invoke(self, obj:object|type, *args, **kwargs) -> object:
if not self.IsStatic and obj is None:
raise TypeError("Object is None")
if not self.IsStatic and not self.IsClassMethod and not isinstance(obj, self.ParentType):
raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}")
if self.IsClassMethod and not isinstance(obj, type):
raise TypeError(f"Class method expected type, got {type(obj)}: {obj}")
result = None
if self.IsStatic:
method = getattr(self.ParentType, self.MemberName)
if method is None:
raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'")
result = method(*args, **kwargs)
elif self.IsClassMethod:
method = getattr(obj, self.MemberName)
if method is None:
raise AttributeError(f"{obj} class has no method '{self.MemberName}'")
result = method(*args, **kwargs)
else:
method = getattr(obj, self.MemberName)
if method is None:
raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'")
result = method(*args, **kwargs)
return result
@override
def SymbolName(self) -> str:
return "MethodInfo"
@override
def ToString(self) -> str:
return f"MethodInfo<name={self.MemberName}, return={self.ReturnType}, ctype={self.ParentType}" \
f"{', static' if self.IsStatic else ''}{', class' if self.IsClassMethod else ''}, {'public' if self.IsPublic else 'private'}, " \
f"params_count={len(self.Parameters)}>"
@classmethod
def Create(
cls,
name: str,
method: Callable,
ctype: Optional[type] = None,
module_name: Optional[str] = None
2025-07-09 23:52:24 +08:00
) -> 'MethodInfo':
2025-07-09 17:39:50 +08:00
'''
创建MethodInfo对象
name: 方法名
method: 方法对象
ctype: 方法所属的类
module_name: 模块名
'''
# 获取方法签名
sig = inspect.signature(method)
is_static = isinstance(method, staticmethod)
is_class_method = isinstance(method, classmethod)
is_public = (name.startswith("__") and name.endswith("__")) or not name.startswith('_')
# 构建参数列表
parameters:List[ParameterInfo] = []
positional_parameters:List[ParameterInfo] = []
keyword_parameters:List[ParameterInfo] = []
for param_name, param in sig.parameters.items():
if param_name in ('self', 'cls'):
continue
ptype = param.annotation if param.annotation != inspect.Parameter.empty else Any
ptype = ptype if isinstance(ptype, type) else Any
param_info = ParameterInfo(
metaType = ptype,
name = param_name,
is_optional = param.default != inspect.Parameter.empty,
default_value = param.default if param.default != inspect.Parameter.empty else None,
module_name = module_name,
selfType=ctype
)
parameters.append(param_info)
if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD):
positional_parameters.append(param_info)
elif param.kind == inspect.Parameter.KEYWORD_ONLY:
keyword_parameters.append(param_info)
# 构建方法信息
return MethodInfo(
return_type = sig.return_annotation if sig.return_annotation != inspect.Signature.empty else Any,
parameters = parameters,
positional_parameters = positional_parameters,
keyword_parameters = keyword_parameters,
name = name,
ctype = ctype,
is_static = is_static,
is_public = is_public,
is_class_method = is_class_method
)
class RefTypeFlag(IntFlag):
Static:int = 0b00000001
Instance:int = 0b00000010
Public:int = 0b00000100
Private:int = 0b00001000
Default:int = 0b00010000
Method:int = 0b00100000
Field:int = 0b01000000
Special:int = 0b10000000
All:int = 0b11111111
_RefTypeLock:Dict[Any|type|_SpecialIndictaor, threading.Lock] = {}
class RefType(ValueInfo):
_FieldInfos: List[FieldInfo] = PrivateAttr()
_MethodInfos: List[MethodInfo] = PrivateAttr()
_MemberNames: List[str] = PrivateAttr()
2025-07-09 23:52:24 +08:00
_BaseTypes: List[Self] = PrivateAttr(default=[])
2025-07-09 17:39:50 +08:00
_initialized: bool = PrivateAttr(default=False)
_BaseMemberNamesSet: Set[str] = PrivateAttr(default_factory=set)
_member_cache: Dict[Tuple[str, RefTypeFlag], Optional[MemberInfo]] = PrivateAttr(default_factory=dict)
def __init__(self, metaType:type|_SpecialIndictaor):
if isinstance(metaType, ListIndictaor):
super().__init__(list, generic_args=[metaType.elementType])
metaType = list
elif isinstance(metaType, DictIndictaor):
super().__init__(dict, generic_args=[metaType.keyType, metaType.valueType])
metaType = dict
elif isinstance(metaType, TupleIndictaor):
2025-07-09 23:52:24 +08:00
super().__init__(tuple, generic_args=list(metaType.elementTypes))
2025-07-09 17:39:50 +08:00
metaType = tuple
elif isinstance(metaType, SetIndictaor):
super().__init__(set, generic_args=[metaType.elementType])
metaType = set
2025-07-09 23:52:24 +08:00
elif IsGeneric(metaType):
2025-07-09 17:39:50 +08:00
raise NotImplementedError("Generic type is not supported")
else:
super().__init__(metaType)
self._FieldInfos = []
self._MethodInfos = []
self._MemberNames = []
self._BaseMemberNamesSet = set()
self._member_cache = {}
# 延迟初始化标志
self._initialized = False
def _ensure_initialized(self):
"""确保完全初始化,实现延迟加载"""
# 初始化锁, 防止同个RefType在多线程中被重复初始化
if self.RealType not in _RefTypeLock:
_RefTypeLock[self.RealType] = threading.Lock()
lock_guard(_RefTypeLock[self.RealType])
if self._initialized:
return
metaType = self.RealType
# 初始化基类列表 - 只初始化一次
if self._BaseTypes is None:
self._BaseTypes = []
for baseType in metaType.__bases__:
if baseType == metaType: # 避免循环引用
continue
self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType))
# 如果BaseMemberNamesSet为空则填充它
if not self._BaseMemberNamesSet:
# 使用更高效的方式收集基类成员名称
for baseType in self._BaseTypes:
# 确保基类已初始化
baseType._ensure_initialized()
# 直接合并集合,而不是逐个添加
self._BaseMemberNamesSet.update(baseType._MemberNames)
self._BaseMemberNamesSet.update(baseType._BaseMemberNamesSet)
class_var = metaType.__dict__
# 只在需要时获取类型注解
annotations_dict = {}
try:
annotations_dict = get_type_hints(metaType)
except (TypeError, NameError):
# 类型提示获取失败时,使用空字典
pass
# 减少函数调用开销的辅助函数,改为内联处理
methods_info = []
method_names = []
fields_info = self._FieldInfos.copy() # 保留extensionFields
field_names = []
# 一次性收集所有成员,避免多次遍历
members = inspect.getmembers(metaType)
# 处理方法
for name, member in members:
if name not in self._BaseMemberNamesSet:
if inspect.ismethod(member) or inspect.isfunction(member):
methods_info.append(MethodInfo.Create(name, member, ctype=metaType, module_name=self.ModuleName))
method_names.append(name)
# 处理字段 (非方法成员)
elif not name.startswith('__'): # 排除魔术方法相关的属性
is_static = name in class_var
is_public = (name.startswith('__') and name.endswith('__')) or not name.startswith('_')
fieldType = annotations_dict.get(name, Any)
field_info = FieldInfo(
metaType = fieldType,
name = name,
ctype = metaType,
is_static = is_static,
is_public = is_public,
module_name = self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(name)
# 处理BaseModel字段 - 这些通常有特殊的处理
if issubclass(metaType, BaseModel):
try:
fields = getattr(metaType, 'model_fields', getattr(metaType, '__pydantic_fields__', {}))
for field_name, model_field in fields.items():
if field_name not in self._BaseMemberNamesSet and field_name not in field_names:
fieldType = model_field.annotation if model_field.annotation is not None else Any
is_public = not getattr(model_field, 'exclude', False)
field_info = FieldInfo(
metaType=fieldType,
name=field_name,
ctype=metaType,
is_public=is_public,
is_static=False,
module_name=self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(field_name)
except (AttributeError, TypeError):
pass # 忽略BaseModel相关错误
# 处理注释中的字段 - 只处理尚未添加的字段
for name, annotation in annotations_dict.items():
if name not in self._BaseMemberNamesSet and name not in field_names and not name.startswith('__'):
field_info = FieldInfo(
2025-07-09 23:52:24 +08:00
metaType=DecayType(annotation),
2025-07-09 17:39:50 +08:00
name=name,
ctype=metaType,
is_static=False,
is_public=not name.startswith('_'),
module_name=self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(name)
# 更新成员列表
self._MethodInfos = methods_info
self._FieldInfos = fields_info
self._MemberNames = method_names + field_names
self._initialized = True
def _where_member(self, member:MemberInfo, flag:RefTypeFlag) -> bool:
# 如果是默认标志,直接根据常见条件判断,避免位运算
if flag == RefTypeFlag.Default:
if isinstance(member, MethodInfo):
return member.IsPublic and (member.IsInstance or member.IsStatic)
elif isinstance(member, FieldInfo):
return member.IsPublic and member.IsInstance
return False
# 否则进行完整的标志检查
stats = True
if member.IsStatic:
stats &= (flag & RefTypeFlag.Static != 0)
else:
stats &= (flag & RefTypeFlag.Instance != 0)
if member.IsPublic:
stats &= (flag & RefTypeFlag.Public != 0)
else:
stats &= (flag & RefTypeFlag.Private != 0)
if isinstance(member, MethodInfo):
stats &= (flag & RefTypeFlag.Method != 0)
elif isinstance(member, FieldInfo):
stats &= (flag & RefTypeFlag.Field != 0)
if member.MemberName.startswith('__') and member.MemberName.endswith('__'):
stats &= (flag & RefTypeFlag.Special != 0)
return stats
# 修改获取成员方法,使用缓存
def GetField(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[FieldInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((field for field in self.GetFields(flags) if field.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetMethod(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MethodInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((method for method in self.GetMethods(flags) if method.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetMember(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MemberInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((member for member in self.GetMembers(flags) if member.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetFieldValue[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> T:
field = self.GetField(name, flags)
if field is not None:
return field.GetValue(obj)
else:
raise ReflectionException(f"Field {name} not found")
def SetFieldValue[T](self, obj:object, name:str, value:T, flags:RefTypeFlag=RefTypeFlag.Default) -> None:
field = self.GetField(name, flags)
if field is not None:
field.SetValue(obj, value)
else:
raise ReflectionException(f"Field {name} not found")
def InvokeMethod[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default, *args, **kwargs) -> T:
method = self.GetMethod(name, flags)
if method is not None:
return method.Invoke(obj, *args, **kwargs)
else:
raise ReflectionException(f"Method {name} not found")
def CreateInstance(self, *args, **kwargs) -> object:
return self.RealType(*args, **kwargs)
@override
def __repr__(self) -> str:
return f"RefType<{self.RealType}{f'<{self.GenericArgs}>' if self.IsGeneric else ''}>"
@override
def SymbolName(self) -> str:
return "RefType"
@override
def ToString(self) -> str:
return f"RefType<type={self.RealType}, generic={self.GenericArgs}>"
2025-07-09 23:52:24 +08:00
def Print2Str(self, verbose:bool=False, flags:RefTypeFlag=RefTypeFlag.Default) -> str:
2025-07-09 17:39:50 +08:00
fields: List[str] = []
methods: List[str] = []
for field in self.GetFields(flags):
fields.append(f"{ConsoleFrontColor.GREEN if field.IsPublic else ConsoleFrontColor.RED}"\
f"{field.ToString() if verbose else field.MemberName}{ConsoleFrontColor.RESET}")
for method in self.GetMethods(flags):
methods.append(f"{ConsoleFrontColor.YELLOW if method.IsPublic else ConsoleFrontColor.RED}"\
f"{method.ToString() if verbose else method.MemberName}{ConsoleFrontColor.RESET}")
return f"RefType<type={self.RealType}{', fields=' if len(fields)!=0 else ''}{', '.join(fields)}{ConsoleFrontColor.RESET}"\
f"{', methods=' if len(methods)!=0 else ''}{', '.join(methods)}{ConsoleFrontColor.RESET}>"
2025-07-09 23:52:24 +08:00
def Print2Tree(self, indent:int=4) -> str:
2025-07-09 17:39:50 +08:00
type_set: set = set()
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
2025-07-09 17:39:50 +08:00
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}"
elif currentType.RealType in type_set:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
2025-07-09 17:39:50 +08:00
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return {
"type": f"{currentType.RealType}",
"value": { field.FieldName: f"{field.FieldType}" for field in currentType.GetFields() }
}
else:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
2025-07-09 17:39:50 +08:00
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType)
value = {}
fields = currentType.GetFields()
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
2025-07-09 17:39:50 +08:00
for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return {
"type": f"{currentType.RealType}",
"value": value
}
return json.dumps(dfs(self), indent=indent)
@override
def __hash__(self) -> int:
"""使RefType对象可哈希基于RealType和GenericArgs的哈希值"""
return hash((self.RealType, tuple(self.GenericArgs)))
@override
def __eq__(self, other) -> bool:
"""比较两个RefType对象是否相等基于RealType的比较"""
if not isinstance(other, RefType):
return False
return self.RealType == other.RealType
# 添加新的优化方法,避免重复检查
2025-07-09 23:52:24 +08:00
def _InitBaseTypesIfNeeded(self):
2025-07-09 17:39:50 +08:00
"""初始化基类类型,只在需要时执行"""
if self._BaseTypes is None:
self._BaseTypes = []
metaType = self.RealType
for baseType in metaType.__bases__:
# 避免循环引用如果baseType是自己则跳过
if baseType == metaType:
continue
self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType))
# 确保正确地实现所有GetBase*方法
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetFields(flag))
return result
2025-07-24 11:45:44 +08:00
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
return self._GetBaseFields(flag)
2025-07-09 17:39:50 +08:00
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetAllBaseFields(self) -> List[FieldInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllFields())
return result
2025-07-24 11:45:44 +08:00
def GetAllBaseFields(self) -> List[FieldInfo]:
return self._GetAllBaseFields()
2025-07-09 17:39:50 +08:00
# 修改所有的GetBase*方法
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetMethods(flag))
return result
2025-07-24 11:45:44 +08:00
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
return self._GetBaseMethods(flag)
2025-07-09 17:39:50 +08:00
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetAllBaseMethods(self) -> List[MethodInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllMethods())
return result
2025-07-24 11:45:44 +08:00
def GetAllBaseMethods(self) -> List[MethodInfo]:
return self._GetAllBaseMethods()
2025-07-09 17:39:50 +08:00
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetMembers(flag))
return result
2025-07-24 11:45:44 +08:00
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
return self._GetBaseMembers(flag)
2025-07-09 17:39:50 +08:00
@functools.lru_cache(maxsize=128)
2025-07-24 11:45:44 +08:00
def _GetAllBaseMembers(self) -> List[MemberInfo]:
2025-07-09 17:39:50 +08:00
if self._BaseTypes is None:
2025-07-09 23:52:24 +08:00
self._InitBaseTypesIfNeeded()
2025-07-09 17:39:50 +08:00
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllMembers())
return result
2025-07-24 11:45:44 +08:00
def GetAllBaseMembers(self) -> List[MemberInfo]:
return self._GetAllBaseMembers()
2025-07-09 17:39:50 +08:00
def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [field for field in self._FieldInfos
if self._where_member(field, RefTypeFlag.Field|RefTypeFlag.Public|RefTypeFlag.Instance)]
else:
result = [field for field in self._FieldInfos if self._where_member(field, flag)]
result.extend(self.GetBaseFields(flag))
return result
def GetAllFields(self) -> List[FieldInfo]:
self._ensure_initialized()
result = self._FieldInfos.copy()
result.extend(self.GetAllBaseFields())
return result
def GetMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [method for method in self._MethodInfos
if self._where_member(method, RefTypeFlag.Method|RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Static)]
else:
result = [method for method in self._MethodInfos if self._where_member(method, flag)]
result.extend(self.GetBaseMethods(flag))
return result
def GetAllMethods(self) -> List[MethodInfo]:
self._ensure_initialized()
result = self._MethodInfos.copy()
result.extend(self.GetAllBaseMethods())
return result
def GetMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [member for member in self._FieldInfos + self._MethodInfos
if self._where_member(member, RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Field|RefTypeFlag.Method)]
else:
result = [member for member in self._FieldInfos + self._MethodInfos if self._where_member(member, flag)]
result.extend(self.GetBaseMembers(flag))
return result
def GetAllMembers(self) -> List[MemberInfo]:
self._ensure_initialized()
result = self._FieldInfos + self._MethodInfos
result.extend(self.GetAllBaseMembers())
return result
type RTypen[_T] = RefType
'''
RTypen[T] T 类型的 RefType
'''
_Internal_TypeManager:Optional['TypeManager'] = None
2025-07-09 23:52:24 +08:00
class TypeManager(BaseModel):
2025-07-09 17:39:50 +08:00
_RefTypes: Dict[type|_SpecialIndictaor, RefType] = PrivateAttr(default_factory=dict)
_is_preheated: bool = PrivateAttr(default=False)
_weak_refs: Dict[int, "weakref.ref[RefType]"] = PrivateAttr(default_factory=dict) # 使用真正的弱引用
_type_name_cache: Dict[str, type] = PrivateAttr(default_factory=dict) # 类型名称到类型的缓存
_string_to_type_cache: Dict[str, Any] = PrivateAttr(default_factory=dict) # 字符串到类型的缓存
@classmethod
2025-07-09 23:52:24 +08:00
def GetInstance(cls) -> 'TypeManager':
2025-07-09 17:39:50 +08:00
global _Internal_TypeManager
if _Internal_TypeManager is None:
_Internal_TypeManager = cls()
_Internal_TypeManager._preheat_cache()
return _Internal_TypeManager
def _preheat_cache(self):
"""预热缓存为常用类型预先创建RefType"""
if self._is_preheated:
return
# 常用的基础类型列表
common_types = [
int, float, str, bool, list, dict, tuple, set,
2025-07-09 23:52:24 +08:00
object, type, None.__class__, Exception, BaseModel
2025-07-09 17:39:50 +08:00
]
# 预加载的类型和它们之间常见的关系,减少后续运行时计算
for t in common_types:
self.CreateRefType(t)
# 同时缓存类型名称到类型的映射
self._type_name_cache[t.__name__] = t
# 也缓存类型字符串到类型的映射
self._string_to_type_cache[t.__name__] = t
# 缓存模块全限定名
if t.__module__ != 'builtins':
full_name = f"{t.__module__}.{t.__name__}"
self._string_to_type_cache[full_name] = t
self._is_preheated = True
def AllRefTypes(self) -> Tuple[RefType, ...]:
return tuple(self._RefTypes.values())
@staticmethod
#@functools.lru_cache(maxsize=256)
def _TurnToType(data:Any, module_name:Optional[str]=None) -> type|_SpecialIndictaor:
"""将任意数据转换为类型,增加缓存以提高性能"""
metaType:type|_SpecialIndictaor = None
# 快速路径:如果已经是类型,直接返回
if isinstance(data, type) or isinstance(data, _SpecialIndictaor):
return data
# 处理字符串类型
if isinstance(data, str):
# 尝试使用模块名解析类型
if module_name is not None:
try:
return sys.modules[module_name].__dict__[data]
except (KeyError, AttributeError):
pass
# 尝试使用to_type函数
try:
2025-07-09 23:52:24 +08:00
return ToType(data, module_name=module_name)
2025-07-09 17:39:50 +08:00
except Exception:
pass
# 尝试使用try_to_type函数作为回退
2025-07-09 23:52:24 +08:00
metaType = TryToType(data, module_name=module_name)
2025-07-09 17:39:50 +08:00
if metaType is None or isinstance(metaType, list):
metaType = data
return metaType
@overload
def CreateOrGetRefType(
self,
type_: type,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateOrGetRefType(
self,
obj: object,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateOrGetRefType(
self,
type_str: str,
module_name: Optional[str] = None
) -> RefType:
...
def CreateOrGetRefType(
self,
data,
module_name: Optional[str] = None
) -> RefType:
"""创建或获取RefType实例使用多级缓存提高性能"""
if data is None:
raise ReflectionException("data is None")
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
2025-07-09 17:39:50 +08:00
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
data = self._string_to_type_cache[data]
# 获取或转换为类型
metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name)
# 首先尝试从弱引用缓存中获取
type_id = id(metaType)
if type_id in self._weak_refs:
ref_type = self._weak_refs[type_id]()
if ref_type is not None:
return ref_type
else:
# 如果弱引用已被回收,则从字典中删除
del self._weak_refs[type_id]
# 然后尝试从常规缓存中获取
try:
ref_type = self._RefTypes[metaType]
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
2025-07-09 17:39:50 +08:00
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type
except KeyError:
ref_type = self.CreateRefType(metaType, module_name=module_name)
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
return ref_type
@overload
def CreateRefType(
self,
type_: type,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateRefType(
self,
obj: object,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateRefType(
self,
type_str: str,
module_name: Optional[str] = None
) -> RefType:
...
def CreateRefType(
self,
data,
module_name: Optional[str] = None
) -> RefType:
"""创建新的RefType实例"""
if data is None:
raise ReflectionException("data is None")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
data = self._string_to_type_cache[data]
metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name)
# 如果是字符串类型,缓存结果以供将来使用
if isinstance(data, str) and not data in self._string_to_type_cache:
self._string_to_type_cache[data] = metaType
try:
ref_type = RefType(metaType)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Create "\
2025-07-09 17:39:50 +08:00
f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type
return ref_type
except Exception as e:
raise ReflectionException(f"Create RefType failed: {e}")
def CreateOrGetRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType:
"""快速路径直接从类型创建或获取RefType"""
if type_ in self._RefTypes:
return self._RefTypes[type_]
else:
return self.CreateRefType(type_)
def CreateRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType:
"""直接从类型创建RefType"""
result = self._RefTypes[type_] = RefType(type_)
return result