Files

1530 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import importlib
import inspect
import types
import weakref
from enum import Enum, IntFlag
from typing import *
import typing
from .Config import *
from pydantic import BaseModel, Field, PrivateAttr
import json
import functools
import concurrent.futures
from typing import Set
type_symbols = {
'int' : int,
'float' : float,
'str' : str,
'list' : list,
'dict' : dict,
'tuple' : tuple,
'set' : set,
'bool' : bool,
'NoneType' : type(None),
}
_Internal_Reflection_Debug:bool = False
def GetInternalReflectionDebug() -> bool:
return _Internal_Reflection_Debug and GetInternalDebug()
def SetInternalReflectionDebug(debug:bool) -> None:
global _Internal_Reflection_Debug
_Internal_Reflection_Debug = debug
# 缓存get_type_from_string的结果
_type_string_cache: Dict[str, type] = {}
class ReflectionException(Exception):
def __init__(self, message:str):
self.message = f"{ConsoleFrontColor.RED}{message}{ConsoleFrontColor.RESET}"
super().__init__(self.message)
def String2Type(type_string:str) -> type:
"""
根据字符串生成类型,使用缓存提高性能
"""
# 检查缓存
if type_string in _type_string_cache:
return _type_string_cache[type_string]
result = None
# 检查内置类型映射
if type_string in type_symbols:
result = type_symbols[type_string]
# 从内置类型中获取
elif type_string in dir(types):
result = getattr(types, type_string)
# 从当前全局命名空间获取
elif type_string in globals():
result = globals().get(type_string)
# 从当前模块获取
elif type_string in dir(__import__(__name__)):
result = getattr(__import__(__name__), type_string)
# 尝试从其他模块获取
else:
try:
if '.' not in type_string:
raise ValueError(f"Empty module name, type_string is {type_string}")
module_name, _, class_name = type_string.rpartition('.')
if not module_name:
raise ValueError(f"Empty module name, type_string is {type_string}")
# 首先尝试直接获取模块
try:
module = sys.modules[module_name]
except KeyError:
# 模块未加载,需要导入
module = importlib.import_module(module_name)
result = getattr(module, class_name)
except (ImportError, AttributeError, ValueError) as ex:
raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>") from ex
# 更新缓存
if result is not None:
_type_string_cache[type_string] = result
return result
raise TypeError(f"Cannot find type '{type_string}', type_string is <{type_string}>")
@functools.lru_cache(maxsize=1024)
def StringWithModel2Type(type_string:str, module_name:str) -> type|None:
'''
根据字符串生成类型,带模块名参数,使用缓存
'''
# 检查内置类型映射
if type_string in type_symbols:
return type_symbols[type_string]
# 尝试从指定模块获取
try:
module = sys.modules.get(module_name)
if module and type_string in dir(module):
return getattr(module, type_string)
except (KeyError, AttributeError):
pass
# 尝试从类型模块获取
if type_string in dir(types):
return getattr(types, type_string)
return None
# 获取泛型参数
def GetGenericArgs(type_hint: type | Any) -> tuple[type | None, tuple[type, ...] | None]:
origin = get_origin(type_hint) # 获取原始类型
args = get_args(type_hint) # 获取泛型参数
if origin is None:
return None, None
return origin, args
def IsGeneric(type_hint: type | Any) -> bool:
return "__origin__" in dir(type_hint)
class _SpecialIndictaor:
pass
class ListIndictaor(_SpecialIndictaor):
elementType:type
def __init__(self, elementType:type):
self.elementType = elementType
def __repr__(self) -> str:
return f"ListIndictaor<elementType={self.elementType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(List[self.elementType])
class DictIndictaor(_SpecialIndictaor):
keyType:type
valueType:type
def __init__(self, keyType:type, valueType:type):
self.keyType = keyType
self.valueType = valueType
def __repr__(self) -> str:
return f"DictIndictaor<keyType={self.keyType}, valueType={self.valueType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Dict[self.keyType, self.valueType])
class TupleIndictaor(_SpecialIndictaor):
elementTypes:Tuple[type, ...]
def __init__(self, *elementTypes:type):
self.elementTypes = elementTypes
def __repr__(self) -> str:
return f"TupleIndictaor<{', '.join(map(str, self.elementTypes))}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Tuple[self.elementTypes])
class SetIndictaor(_SpecialIndictaor):
elementType:type
def __init__(self, elementType:type):
self.elementType = elementType
def __repr__(self) -> str:
return f"SetIndictaor<elementType={self.elementType}>"
def __str__(self) -> str:
return self.__repr__()
def __hash__(self) -> int:
return hash(Set[self.elementType])
# 添加记忆化装饰器
def memoize(func):
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
return wrapper
# 优化to_type函数
@memoize
def ToType(
typen: type|Any|str,
*,
module_name: str|None=None
) -> type|List[type]|_SpecialIndictaor:
# 快速路径:如果已经是类型,直接返回
if isinstance(typen, type):
return typen
elif isinstance(typen, _SpecialIndictaor):
return typen
elif isinstance(typen, str):
# 快速路径:检查字符串的格式
if '.' in typen:
# 可能是带模块名的类型
try:
module_parts = typen.split('.')
# 尝试从sys.modules中获取模块
current_module = sys.modules
for part in module_parts[:-1]:
if part in current_module:
current_module = current_module[part]
else:
# 需要导入模块
current_module = importlib.import_module('.'.join(module_parts[:-1]))
break
# 获取类型
if isinstance(current_module, dict):
# 从字典中获取
result = current_module.get(module_parts[-1])
if isinstance(result, type):
return result
else:
# 从模块对象中获取
result = getattr(current_module, module_parts[-1], None)
if isinstance(result, type):
return result
except (ImportError, AttributeError):
pass
# 回退到一般处理
import sys
if not all(c.isalnum() or c == '.' for c in typen):
raise ValueError(f"Invalid type string: {typen}, only alphanumeric characters and dots are allowed")
type_components = typen.split(".")
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1]
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}")
if type_module is not None:
return sys.modules[type_module].__dict__[type_final]
else:
for module in sys.modules.values():
if type_final in module.__dict__:
return module.__dict__[type_final]
return String2Type(typen)
elif IsUnion(typen):
uTypes = GetUnionTypes(typen)
uTypes = [uType for uType in uTypes if uType is not type(None)]
if len(uTypes) == 1:
return uTypes[0]
elif len(uTypes) == 0:
return type(None)
else:
return uTypes
elif hasattr(typen, '__origin__'):
oType = get_origin(typen)
if oType is list:
return ListIndictaor(get_args(typen)[0])
elif oType is dict:
return DictIndictaor(get_args(typen)[0], get_args(typen)[1])
elif oType is tuple:
return TupleIndictaor(*get_args(typen))
elif oType is set:
return SetIndictaor(get_args(typen)[0])
else:
return oType
else:
return type(typen)
def TryToType(typen:type|Any|str, *, module_name:str|None=None) -> type|List[type]|_SpecialIndictaor|None:
try:
return ToType(typen, module_name=module_name)
except Exception:
return None
def IsUnion(type_hint: type | Any) -> bool:
return "__origin__" in dir(type_hint) and type_hint.__origin__ == Union
def GetUnionTypes(type_hint: type | Any) -> List[type]:
return [t for t in type_hint.__args__]
class TypeVarIndictaor:
pass
class AnyVarIndicator:
pass
# 优化decay_type函数
@memoize
def DecayType(
type_hint: type|Any,
*,
module_name: str|None=None
) -> type|List[type]|_SpecialIndictaor:
# 快速路径:直接判断常见类型
if isinstance(type_hint, (type, _SpecialIndictaor)):
return type_hint
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None
# 处理字符串类型
if isinstance(type_hint, str):
try:
result = ToType(type_hint, module_name=module_name)
except TypeError:
result = Any
# 处理forward reference
elif hasattr(type_hint, "__forward_arg__"):
result = ToType(type_hint.__forward_arg__, module_name=module_name)
# 处理type类型
elif type_hint is type:
result = type_hint
# 处理union类型
elif IsUnion(type_hint):
result = GetUnionTypes(type_hint)
# 处理TypeVar
elif isinstance(type_hint, TypeVar):
result = TypeVarIndictaor
# 处理泛型类型
elif IsGeneric(type_hint):
result = get_origin(type_hint)
else:
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
'''
检查成员是否只在当前类中定义,而不是在父类中定义
'''
# 获取当前类的所有成员
current_members = dict(inspect.getmembers(current_class))
if member_name not in current_members:
return False
# 获取父类的所有成员
for baseType in current_class.__bases__:
parent_members = dict(inspect.getmembers(baseType))
if member_name in parent_members:
return False
return True
class BaseInfo(BaseModel):
def SymbolName(self) -> str:
return "BaseInfo"
def ToString(self) -> str:
return self.SymbolName()
class MemberInfo(BaseInfo):
_MemberName: str = PrivateAttr(default="")
_ParentType: Optional[type] = PrivateAttr(default=None)
_IsStatic: bool = PrivateAttr(default=False)
_IsPublic: bool = PrivateAttr(default=False)
def __init__(self, name:str, ctype:Optional[type], is_static:bool, is_public:bool, **kwargs):
super().__init__(**kwargs)
self._MemberName = name
self._ParentType = ctype
self._IsStatic = is_static
self._IsPublic = is_public
@property
def MemberName(self) -> str:
return self._MemberName
@property
def ParentType(self) -> Optional[type]:
return self._ParentType
@property
def IsStatic(self) -> bool:
return self._IsStatic
@property
def IsPublic(self) -> bool:
return self._IsPublic
@override
def __repr__(self) -> str:
return f"<{self.MemberName}>"
@override
def __str__(self) -> str:
return f"{self.MemberName}"
@override
def SymbolName(self) -> str:
return "MemberInfo"
@override
def ToString(self) -> str:
return f"MemberInfo<name={self.MemberName}, ctype={self.ParentType}, " \
f"{'static' if self.IsStatic else 'instance'}, {'public' if self.IsPublic else 'private'}>"
class ValueInfo(BaseInfo):
_RealType: Optional[Any] = PrivateAttr(default=None)
_IsPrimitive: bool = PrivateAttr(default=False)
_IsValueType: bool = PrivateAttr(default=False)
_IsCollection: bool = PrivateAttr(default=False)
_IsDictionary: bool = PrivateAttr(default=False)
_IsTuple: bool = PrivateAttr(default=False)
_IsSet: bool = PrivateAttr(default=False)
_IsList: bool = PrivateAttr(default=False)
_IsUnsupported: bool = PrivateAttr(default=False)
_GenericArgs: List[type] = PrivateAttr(default=[])
@property
def IsUnion(self) -> bool:
return IsUnion(self._RealType)
@property
def RealType(self) -> type|Any:
return self._RealType
@property
def IsCollection(self) -> bool:
return self._IsCollection
@property
def IsPrimitive(self) -> bool:
return self._IsPrimitive
@property
def IsValueType(self) -> bool:
return self._IsValueType
@property
def IsDictionary(self) -> bool:
return self._IsDictionary
@property
def IsTuple(self) -> bool:
return self._IsTuple
@property
def IsSet(self) -> bool:
return self._IsSet
@property
def IsList(self) -> bool:
return self._IsList
@property
def IsUnsupported(self) -> bool:
return self._IsUnsupported
@property
def GenericArgs(self) -> List[type]:
return self._GenericArgs
@property
def IsGeneric(self) -> bool:
return len(self._GenericArgs) > 0
@property
def Module(self) -> Optional[Dict[str, type]]:
return sys.modules[self.RealType.__module__]
@property
def ModuleName(self) -> str:
return self.RealType.__module__
def __init__(self, metaType:type|Any, generic_args:List[type]=[], **kwargs) -> None:
super().__init__(**kwargs)
self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0:
PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args
if not isinstance(metaType, type):
return
self._IsPrimitive = (
issubclass(metaType, int) or
issubclass(metaType, float) or
issubclass(metaType, str) or
issubclass(metaType, bool) or
issubclass(metaType, complex) #or
# issubclass(metaType, tuple) or
# issubclass(metaType, set) or
# issubclass(metaType, list) or
# issubclass(metaType, dict)
)
self._IsValueType = (
issubclass(metaType, int) or
issubclass(metaType, float) or
issubclass(metaType, str) or
issubclass(metaType, bool) or
issubclass(metaType, complex)
)
self._IsCollection = (
issubclass(metaType, list) or
issubclass(metaType, dict) or
issubclass(metaType, tuple) or
issubclass(metaType, set)
)
self._IsDictionary = (
issubclass(metaType, dict)
)
self._IsTuple = (
issubclass(metaType, tuple)
)
self._IsSet = (
issubclass(metaType, set)
)
self._IsList = (
issubclass(metaType, list)
)
def Verify(self, valueType:type) -> bool:
if self.IsUnsupported:
raise ReflectionException(f"Unsupported type: {self.RealType}")
if valueType is type(None):
return True
if self.IsUnion:
return any(ValueInfo(uType).Verify(valueType) for uType in GetUnionTypes(self.RealType))
elif self.RealType is Any:
return True
elif self.RealType is type(None):
return valueType is None or valueType is type(None)
else:
try:
return issubclass(valueType, self.RealType)
except Exception as e:
raise ReflectionException(f"Verify type {valueType} with {self.RealType}: \n{e}") from e
def DecayToList(self) -> List[Self]:
result:List[Self] = []
if self.IsUnion:
for uType in GetUnionTypes(self.RealType):
result.extend(ValueInfo(uType).DecayToList())
else:
result.append(self)
result = list(dict.fromkeys(result).keys())
return result
@override
def __repr__(self) -> str:
generic_args = ", ".join(self._GenericArgs)
return f"ValueInfo<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>"
@override
def SymbolName(self) -> str:
return "ValueInfo"
@override
def ToString(self) -> str:
generic_args = ", ".join(self._GenericArgs)
return f"<{self.RealType}{f'[{generic_args}]' if generic_args else ''}>"
@staticmethod
def Create(
metaType: type|Any,
*,
module_name: Optional[str] = None,
SelfType: type|Any|None = None,
**kwargs
) -> 'ValueInfo':
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type):
if metaType is list:
return ValueInfo(list, [Any])
elif metaType is dict:
return ValueInfo(dict, [Any, Any])
elif metaType is tuple:
return ValueInfo(tuple, [])
elif metaType is set:
return ValueInfo(set, [Any])
else:
return ValueInfo(metaType, **kwargs)
elif isinstance(metaType, str):
type_ = TryToType(metaType, module_name=module_name)
if type_ is None:
return ValueInfo(metaType, **kwargs)
else:
return ValueInfo(type_, **kwargs)
elif metaType is Self:
if SelfType is None:
raise ReflectionException("SelfType is required when metaType is <Self>")
return ValueInfo.Create(SelfType, **kwargs)
elif isinstance(metaType, TypeVar):
gargs = GetGenericArgs(metaType)
if len(gargs) == 1:
return ValueInfo(gargs[0], **kwargs)
else:
return ValueInfo(Any, **kwargs)
elif hasattr(metaType, '__origin__'):
oType = get_origin(metaType)
if oType is list:
return ValueInfo(list, [get_args(metaType)[0]])
elif oType is dict:
return ValueInfo(dict, [get_args(metaType)[0], get_args(metaType)[1]])
elif oType is tuple:
return ValueInfo(tuple, list(get_args(metaType)))
elif oType is set:
return ValueInfo(set, [get_args(metaType)[0]])
return ValueInfo(metaType, **kwargs)
class FieldInfo(MemberInfo):
_MetaType: Optional[ValueInfo] = PrivateAttr(default=None)
def __init__(
self,
metaType: Any,
name: str,
ctype: type,
is_static: bool,
is_public: bool,
module_name: Optional[str] = None,
selfType: type|Any|None = None
):
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__(
name = name,
ctype = ctype,
is_static = is_static,
is_public = is_public,
)
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property
def IsUnion(self) -> bool:
return self.ValueType.IsUnion
@property
def FieldName(self) -> str:
'''
字段名称
'''
return self.MemberName
@property
def ValueType(self) -> ValueInfo:
return self._MetaType
@property
def FieldType(self):
'''
字段类型
'''
return self.ValueType.RealType
def Verify(self, valueType:type) -> bool:
return self.ValueType.Verify(valueType)
def GetValue(self, obj:Any) -> Any:
if self.IsStatic:
return getattr(self.ParentType, self.MemberName)
else:
if not isinstance(obj, self.ParentType):
raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\
f"{ConsoleFrontColor.RED} , parent type mismatch, expected {self.ParentType}, got {type(obj)}"\
f"{ConsoleFrontColor.RESET}")
return getattr(obj, self.MemberName)
def SetValue(self, obj:Any, value:Any) -> None:
if self.IsStatic:
if self.Verify(type(value)):
setattr(self.ParentType, self.MemberName, value)
else:
raise TypeError(f"Value type mismatch, expected {self.MetaType.RealType}, got {type(value)}")
else:
if not isinstance(obj, self.ParentType):
raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}")
if self.Verify(type(value)):
setattr(obj, self.MemberName, value)
else:
raise TypeError(f"{ConsoleFrontColor.RED}Field {ConsoleFrontColor.LIGHTBLUE_EX}{self.MemberName}"\
f"{ConsoleFrontColor.RED} , value type mismatch, expected \"{self.FieldType}\""\
f", got {type(value)}{ConsoleFrontColor.RESET}")
@override
def __repr__(self) -> str:
return f"<{self.MemberName} type={self.FieldType}>"
@override
def SymbolName(self) -> str:
return "FieldInfo"
@override
def ToString(self) -> str:
return f"FieldInfo<name={self.MemberName}, type={self.FieldType}, ctype={self.ParentType}, "\
f"{("generics="+str(self.ValueType.GenericArgs)+", ") if self.ValueType.IsGeneric else ''}" \
f"{'static' if self.IsStatic else 'instance'}, {'public' if self.IsPublic else 'private'}>"
class ParameterInfo(BaseInfo):
_MetaType: Optional[ValueInfo] = PrivateAttr(default=None)
_ParameterName: str = PrivateAttr(default="")
_IsOptional: bool = PrivateAttr(default=False)
_DefaultValue: Any = PrivateAttr(default=None)
def __init__(
self,
metaType: Any,
name: str,
is_optional: bool,
default_value: Any,
module_name: Optional[str] = None,
selfType: type|Any|None = None,
**kwargs
):
super().__init__(**kwargs)
self._ParameterName = name
self._IsOptional = is_optional
self._DefaultValue = default_value
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
@property
def ValueType(self):
return self._MetaType
@property
def ParameterName(self) -> str:
return self._ParameterName
@property
def ParameterType(self):
return self._MetaType.RealType
@property
def IsOptional(self) -> bool:
return self._IsOptional
@property
def DefaultValue(self) -> Any:
return self._DefaultValue
def Verify(self, valueType:type) -> bool:
return self._MetaType.Verify(valueType)
@override
def __repr__(self) -> str:
return f"<{self.ParameterName}>"
@override
def SymbolName(self) -> str:
return "ParameterInfo"
@override
def ToString(self) -> str:
return f"ParameterInfo<name={self.ParameterName}, type={self.ParameterType}, " \
f"{'optional' if self.IsOptional else 'required'}, default={self.DefaultValue}>"
class MethodInfo(MemberInfo):
_ReturnType: Optional[ValueInfo] = PrivateAttr(default=None)
_Parameters: List[ParameterInfo] = PrivateAttr(default=[])
_PositionalParameters: List[ParameterInfo] = PrivateAttr(default=[])
_KeywordParameters: List[ParameterInfo] = PrivateAttr(default=[])
_IsClassMethod: bool = PrivateAttr(default=False)
def __init__(
self,
return_type: Any,
parameters: List[ParameterInfo],
positional_parameters: List[ParameterInfo],
keyword_parameters: List[ParameterInfo],
name: str,
ctype: type,
is_static: bool,
is_public: bool,
is_class_method: bool,
):
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
self._Parameters = parameters
self._PositionalParameters = positional_parameters
self._KeywordParameters = keyword_parameters
self._IsClassMethod = is_class_method
@property
def ReturnType(self) -> ValueInfo:
return self._ReturnType.RealType
@property
def Parameters(self) -> List[ParameterInfo]:
return self._Parameters
@property
def PositionalParameters(self) -> List[ParameterInfo]:
return self._PositionalParameters
@property
def KeywordParameters(self) -> List[ParameterInfo]:
return self._KeywordParameters
@property
def IsClassMethod(self) -> bool:
return self._IsClassMethod
@overload
def Invoke(self, obj:object, *args, **kwargs) -> object:
'''
调用实例方法
'''
...
@overload
def Invoke(self, obj:type, *args, **kwargs) -> object:
'''
调用类方法
'''
...
@overload
def Invoke(self, noneObj:Literal[None]|None, *args, **kwargs) -> object:
'''
调用静态方法
'''
...
def Invoke(self, obj:object|type, *args, **kwargs) -> object:
if not self.IsStatic and obj is None:
raise TypeError("Object is None")
if not self.IsStatic and not self.IsClassMethod and not isinstance(obj, self.ParentType):
raise TypeError(f"Parent type mismatch, expected {self.ParentType}, got {type(obj)}")
if self.IsClassMethod and not isinstance(obj, type):
raise TypeError(f"Class method expected type, got {type(obj)}: {obj}")
result = None
if self.IsStatic:
method = getattr(self.ParentType, self.MemberName)
if method is None:
raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'")
result = method(*args, **kwargs)
elif self.IsClassMethod:
method = getattr(obj, self.MemberName)
if method is None:
raise AttributeError(f"{obj} class has no method '{self.MemberName}'")
result = method(*args, **kwargs)
else:
method = getattr(obj, self.MemberName)
if method is None:
raise AttributeError(f"{self.ParentType} type has no method '{self.MemberName}'")
result = method(*args, **kwargs)
return result
@override
def SymbolName(self) -> str:
return "MethodInfo"
@override
def ToString(self) -> str:
return f"MethodInfo<name={self.MemberName}, return={self.ReturnType}, ctype={self.ParentType}" \
f"{', static' if self.IsStatic else ''}{', class' if self.IsClassMethod else ''}, {'public' if self.IsPublic else 'private'}, " \
f"params_count={len(self.Parameters)}>"
@classmethod
def Create(
cls,
name: str,
method: Callable,
ctype: Optional[type] = None,
module_name: Optional[str] = None
) -> 'MethodInfo':
'''
创建MethodInfo对象
name: 方法名
method: 方法对象
ctype: 方法所属的类
module_name: 模块名
'''
# 获取方法签名
sig = inspect.signature(method)
is_static = isinstance(method, staticmethod)
is_class_method = isinstance(method, classmethod)
is_public = (name.startswith("__") and name.endswith("__")) or not name.startswith('_')
# 构建参数列表
parameters:List[ParameterInfo] = []
positional_parameters:List[ParameterInfo] = []
keyword_parameters:List[ParameterInfo] = []
for param_name, param in sig.parameters.items():
if param_name in ('self', 'cls'):
continue
ptype = param.annotation if param.annotation != inspect.Parameter.empty else Any
ptype = ptype if isinstance(ptype, type) else Any
param_info = ParameterInfo(
metaType = ptype,
name = param_name,
is_optional = param.default != inspect.Parameter.empty,
default_value = param.default if param.default != inspect.Parameter.empty else None,
module_name = module_name,
selfType=ctype
)
parameters.append(param_info)
if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD):
positional_parameters.append(param_info)
elif param.kind == inspect.Parameter.KEYWORD_ONLY:
keyword_parameters.append(param_info)
# 构建方法信息
return MethodInfo(
return_type = sig.return_annotation if sig.return_annotation != inspect.Signature.empty else Any,
parameters = parameters,
positional_parameters = positional_parameters,
keyword_parameters = keyword_parameters,
name = name,
ctype = ctype,
is_static = is_static,
is_public = is_public,
is_class_method = is_class_method
)
class RefTypeFlag(IntFlag):
Static:int = 0b00000001
Instance:int = 0b00000010
Public:int = 0b00000100
Private:int = 0b00001000
Default:int = 0b00010000
Method:int = 0b00100000
Field:int = 0b01000000
Special:int = 0b10000000
All:int = 0b11111111
_RefTypeLock:Dict[Any|type|_SpecialIndictaor, threading.Lock] = {}
class RefType(ValueInfo):
_FieldInfos: List[FieldInfo] = PrivateAttr()
_MethodInfos: List[MethodInfo] = PrivateAttr()
_MemberNames: List[str] = PrivateAttr()
_BaseTypes: List[Self] = PrivateAttr(default=[])
_initialized: bool = PrivateAttr(default=False)
_BaseMemberNamesSet: Set[str] = PrivateAttr(default_factory=set)
_member_cache: Dict[Tuple[str, RefTypeFlag], Optional[MemberInfo]] = PrivateAttr(default_factory=dict)
def __init__(self, metaType:type|_SpecialIndictaor):
if isinstance(metaType, ListIndictaor):
super().__init__(list, generic_args=[metaType.elementType])
metaType = list
elif isinstance(metaType, DictIndictaor):
super().__init__(dict, generic_args=[metaType.keyType, metaType.valueType])
metaType = dict
elif isinstance(metaType, TupleIndictaor):
super().__init__(tuple, generic_args=list(metaType.elementTypes))
metaType = tuple
elif isinstance(metaType, SetIndictaor):
super().__init__(set, generic_args=[metaType.elementType])
metaType = set
elif IsGeneric(metaType):
raise NotImplementedError("Generic type is not supported")
else:
super().__init__(metaType)
self._FieldInfos = []
self._MethodInfos = []
self._MemberNames = []
self._BaseMemberNamesSet = set()
self._member_cache = {}
# 延迟初始化标志
self._initialized = False
def _ensure_initialized(self):
"""确保完全初始化,实现延迟加载"""
# 初始化锁, 防止同个RefType在多线程中被重复初始化
if self.RealType not in _RefTypeLock:
_RefTypeLock[self.RealType] = threading.Lock()
lock_guard(_RefTypeLock[self.RealType])
if self._initialized:
return
metaType = self.RealType
# 初始化基类列表 - 只初始化一次
if self._BaseTypes is None:
self._BaseTypes = []
for baseType in metaType.__bases__:
if baseType == metaType: # 避免循环引用
continue
self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType))
# 如果BaseMemberNamesSet为空则填充它
if not self._BaseMemberNamesSet:
# 使用更高效的方式收集基类成员名称
for baseType in self._BaseTypes:
# 确保基类已初始化
baseType._ensure_initialized()
# 直接合并集合,而不是逐个添加
self._BaseMemberNamesSet.update(baseType._MemberNames)
self._BaseMemberNamesSet.update(baseType._BaseMemberNamesSet)
class_var = metaType.__dict__
# 只在需要时获取类型注解
annotations_dict = {}
try:
annotations_dict = get_type_hints(metaType)
except (TypeError, NameError):
# 类型提示获取失败时,使用空字典
pass
# 减少函数调用开销的辅助函数,改为内联处理
methods_info = []
method_names = []
fields_info = self._FieldInfos.copy() # 保留extensionFields
field_names = []
# 一次性收集所有成员,避免多次遍历
members = inspect.getmembers(metaType)
# 处理方法
for name, member in members:
if name not in self._BaseMemberNamesSet:
if inspect.ismethod(member) or inspect.isfunction(member):
methods_info.append(MethodInfo.Create(name, member, ctype=metaType, module_name=self.ModuleName))
method_names.append(name)
# 处理字段 (非方法成员)
elif not name.startswith('__'): # 排除魔术方法相关的属性
is_static = name in class_var
is_public = (name.startswith('__') and name.endswith('__')) or not name.startswith('_')
fieldType = annotations_dict.get(name, Any)
field_info = FieldInfo(
metaType = fieldType,
name = name,
ctype = metaType,
is_static = is_static,
is_public = is_public,
module_name = self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(name)
# 处理BaseModel字段 - 这些通常有特殊的处理
if issubclass(metaType, BaseModel):
try:
fields = getattr(metaType, 'model_fields', getattr(metaType, '__pydantic_fields__', {}))
for field_name, model_field in fields.items():
if field_name not in self._BaseMemberNamesSet and field_name not in field_names:
fieldType = model_field.annotation if model_field.annotation is not None else Any
is_public = not getattr(model_field, 'exclude', False)
field_info = FieldInfo(
metaType=fieldType,
name=field_name,
ctype=metaType,
is_public=is_public,
is_static=False,
module_name=self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(field_name)
except (AttributeError, TypeError):
pass # 忽略BaseModel相关错误
# 处理注释中的字段 - 只处理尚未添加的字段
for name, annotation in annotations_dict.items():
if name not in self._BaseMemberNamesSet and name not in field_names and not name.startswith('__'):
field_info = FieldInfo(
metaType=DecayType(annotation),
name=name,
ctype=metaType,
is_static=False,
is_public=not name.startswith('_'),
module_name=self.ModuleName,
selfType=metaType
)
fields_info.append(field_info)
field_names.append(name)
# 更新成员列表
self._MethodInfos = methods_info
self._FieldInfos = fields_info
self._MemberNames = method_names + field_names
self._initialized = True
def _where_member(self, member:MemberInfo, flag:RefTypeFlag) -> bool:
# 如果是默认标志,直接根据常见条件判断,避免位运算
if flag == RefTypeFlag.Default:
if isinstance(member, MethodInfo):
return member.IsPublic and (member.IsInstance or member.IsStatic)
elif isinstance(member, FieldInfo):
return member.IsPublic and member.IsInstance
return False
# 否则进行完整的标志检查
stats = True
if member.IsStatic:
stats &= (flag & RefTypeFlag.Static != 0)
else:
stats &= (flag & RefTypeFlag.Instance != 0)
if member.IsPublic:
stats &= (flag & RefTypeFlag.Public != 0)
else:
stats &= (flag & RefTypeFlag.Private != 0)
if isinstance(member, MethodInfo):
stats &= (flag & RefTypeFlag.Method != 0)
elif isinstance(member, FieldInfo):
stats &= (flag & RefTypeFlag.Field != 0)
if member.MemberName.startswith('__') and member.MemberName.endswith('__'):
stats &= (flag & RefTypeFlag.Special != 0)
return stats
# 修改获取成员方法,使用缓存
def GetField(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[FieldInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((field for field in self.GetFields(flags) if field.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetMethod(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MethodInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((method for method in self.GetMethods(flags) if method.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetMember(self, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> Optional[MemberInfo]:
cache_key = (name, flags)
if cache_key in self._member_cache:
return self._member_cache[cache_key]
result = next((member for member in self.GetMembers(flags) if member.MemberName == name), None)
self._member_cache[cache_key] = result
return result
def GetFieldValue[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default) -> T:
field = self.GetField(name, flags)
if field is not None:
return field.GetValue(obj)
else:
raise ReflectionException(f"Field {name} not found")
def SetFieldValue[T](self, obj:object, name:str, value:T, flags:RefTypeFlag=RefTypeFlag.Default) -> None:
field = self.GetField(name, flags)
if field is not None:
field.SetValue(obj, value)
else:
raise ReflectionException(f"Field {name} not found")
def InvokeMethod[T](self, obj:object, name:str, flags:RefTypeFlag=RefTypeFlag.Default, *args, **kwargs) -> T:
method = self.GetMethod(name, flags)
if method is not None:
return method.Invoke(obj, *args, **kwargs)
else:
raise ReflectionException(f"Method {name} not found")
def CreateInstance(self, *args, **kwargs) -> object:
return self.RealType(*args, **kwargs)
@override
def __repr__(self) -> str:
return f"RefType<{self.RealType}{f'<{self.GenericArgs}>' if self.IsGeneric else ''}>"
@override
def SymbolName(self) -> str:
return "RefType"
@override
def ToString(self) -> str:
return f"RefType<type={self.RealType}, generic={self.GenericArgs}>"
def Print2Str(self, verbose:bool=False, flags:RefTypeFlag=RefTypeFlag.Default) -> str:
fields: List[str] = []
methods: List[str] = []
for field in self.GetFields(flags):
fields.append(f"{ConsoleFrontColor.GREEN if field.IsPublic else ConsoleFrontColor.RED}"\
f"{field.ToString() if verbose else field.MemberName}{ConsoleFrontColor.RESET}")
for method in self.GetMethods(flags):
methods.append(f"{ConsoleFrontColor.YELLOW if method.IsPublic else ConsoleFrontColor.RED}"\
f"{method.ToString() if verbose else method.MemberName}{ConsoleFrontColor.RESET}")
return f"RefType<type={self.RealType}{', fields=' if len(fields)!=0 else ''}{', '.join(fields)}{ConsoleFrontColor.RESET}"\
f"{', methods=' if len(methods)!=0 else ''}{', '.join(methods)}{ConsoleFrontColor.RESET}>"
def Print2Tree(self, indent:int=4) -> str:
type_set: set = set()
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}"
elif currentType.RealType in type_set:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return {
"type": f"{currentType.RealType}",
"value": { field.FieldName: f"{field.FieldType}" for field in currentType.GetFields() }
}
else:
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType)
value = {}
fields = currentType.GetFields()
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return {
"type": f"{currentType.RealType}",
"value": value
}
return json.dumps(dfs(self), indent=indent)
@override
def __hash__(self) -> int:
"""使RefType对象可哈希基于RealType和GenericArgs的哈希值"""
return hash((self.RealType, tuple(self.GenericArgs)))
@override
def __eq__(self, other) -> bool:
"""比较两个RefType对象是否相等基于RealType的比较"""
if not isinstance(other, RefType):
return False
return self.RealType == other.RealType
# 添加新的优化方法,避免重复检查
def _InitBaseTypesIfNeeded(self):
"""初始化基类类型,只在需要时执行"""
if self._BaseTypes is None:
self._BaseTypes = []
metaType = self.RealType
for baseType in metaType.__bases__:
# 避免循环引用如果baseType是自己则跳过
if baseType == metaType:
continue
self._BaseTypes.append(TypeManager.GetInstance().CreateOrGetRefType(baseType))
# 确保正确地实现所有GetBase*方法
@functools.lru_cache(maxsize=128)
def _GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetFields(flag))
return result
def GetBaseFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
return self._GetBaseFields(flag)
@functools.lru_cache(maxsize=128)
def _GetAllBaseFields(self) -> List[FieldInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllFields())
return result
def GetAllBaseFields(self) -> List[FieldInfo]:
return self._GetAllBaseFields()
# 修改所有的GetBase*方法
@functools.lru_cache(maxsize=128)
def _GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetMethods(flag))
return result
def GetBaseMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
return self._GetBaseMethods(flag)
@functools.lru_cache(maxsize=128)
def _GetAllBaseMethods(self) -> List[MethodInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllMethods())
return result
def GetAllBaseMethods(self) -> List[MethodInfo]:
return self._GetAllBaseMethods()
@functools.lru_cache(maxsize=128)
def _GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetMembers(flag))
return result
def GetBaseMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
return self._GetBaseMembers(flag)
@functools.lru_cache(maxsize=128)
def _GetAllBaseMembers(self) -> List[MemberInfo]:
if self._BaseTypes is None:
self._InitBaseTypesIfNeeded()
result = []
for baseType in self._BaseTypes:
result.extend(baseType.GetAllMembers())
return result
def GetAllBaseMembers(self) -> List[MemberInfo]:
return self._GetAllBaseMembers()
def GetFields(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[FieldInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [field for field in self._FieldInfos
if self._where_member(field, RefTypeFlag.Field|RefTypeFlag.Public|RefTypeFlag.Instance)]
else:
result = [field for field in self._FieldInfos if self._where_member(field, flag)]
result.extend(self.GetBaseFields(flag))
return result
def GetAllFields(self) -> List[FieldInfo]:
self._ensure_initialized()
result = self._FieldInfos.copy()
result.extend(self.GetAllBaseFields())
return result
def GetMethods(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MethodInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [method for method in self._MethodInfos
if self._where_member(method, RefTypeFlag.Method|RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Static)]
else:
result = [method for method in self._MethodInfos if self._where_member(method, flag)]
result.extend(self.GetBaseMethods(flag))
return result
def GetAllMethods(self) -> List[MethodInfo]:
self._ensure_initialized()
result = self._MethodInfos.copy()
result.extend(self.GetAllBaseMethods())
return result
def GetMembers(self, flag:RefTypeFlag=RefTypeFlag.Default) -> List[MemberInfo]:
self._ensure_initialized()
if flag == RefTypeFlag.Default:
result = [member for member in self._FieldInfos + self._MethodInfos
if self._where_member(member, RefTypeFlag.Public|RefTypeFlag.Instance|RefTypeFlag.Field|RefTypeFlag.Method)]
else:
result = [member for member in self._FieldInfos + self._MethodInfos if self._where_member(member, flag)]
result.extend(self.GetBaseMembers(flag))
return result
def GetAllMembers(self) -> List[MemberInfo]:
self._ensure_initialized()
result = self._FieldInfos + self._MethodInfos
result.extend(self.GetAllBaseMembers())
return result
type RTypen[_T] = RefType
'''
RTypen[T] 是 T 类型的 RefType
'''
_Internal_TypeManager:Optional['TypeManager'] = None
class TypeManager(BaseModel):
_RefTypes: Dict[type|_SpecialIndictaor, RefType] = PrivateAttr(default_factory=dict)
_is_preheated: bool = PrivateAttr(default=False)
_weak_refs: Dict[int, "weakref.ref[RefType]"] = PrivateAttr(default_factory=dict) # 使用真正的弱引用
_type_name_cache: Dict[str, type] = PrivateAttr(default_factory=dict) # 类型名称到类型的缓存
_string_to_type_cache: Dict[str, Any] = PrivateAttr(default_factory=dict) # 字符串到类型的缓存
@classmethod
def GetInstance(cls) -> 'TypeManager':
global _Internal_TypeManager
if _Internal_TypeManager is None:
_Internal_TypeManager = cls()
_Internal_TypeManager._preheat_cache()
return _Internal_TypeManager
def _preheat_cache(self):
"""预热缓存为常用类型预先创建RefType"""
if self._is_preheated:
return
# 常用的基础类型列表
common_types = [
int, float, str, bool, list, dict, tuple, set,
object, type, None.__class__, Exception, BaseModel
]
# 预加载的类型和它们之间常见的关系,减少后续运行时计算
for t in common_types:
self.CreateRefType(t)
# 同时缓存类型名称到类型的映射
self._type_name_cache[t.__name__] = t
# 也缓存类型字符串到类型的映射
self._string_to_type_cache[t.__name__] = t
# 缓存模块全限定名
if t.__module__ != 'builtins':
full_name = f"{t.__module__}.{t.__name__}"
self._string_to_type_cache[full_name] = t
self._is_preheated = True
def AllRefTypes(self) -> Tuple[RefType, ...]:
return tuple(self._RefTypes.values())
@staticmethod
#@functools.lru_cache(maxsize=256)
def _TurnToType(data:Any, module_name:Optional[str]=None) -> type|_SpecialIndictaor:
"""将任意数据转换为类型,增加缓存以提高性能"""
metaType:type|_SpecialIndictaor = None
# 快速路径:如果已经是类型,直接返回
if isinstance(data, type) or isinstance(data, _SpecialIndictaor):
return data
# 处理字符串类型
if isinstance(data, str):
# 尝试使用模块名解析类型
if module_name is not None:
try:
return sys.modules[module_name].__dict__[data]
except (KeyError, AttributeError):
pass
# 尝试使用to_type函数
try:
return ToType(data, module_name=module_name)
except Exception:
pass
# 尝试使用try_to_type函数作为回退
metaType = TryToType(data, module_name=module_name)
if metaType is None or isinstance(metaType, list):
metaType = data
return metaType
@overload
def CreateOrGetRefType(
self,
type_: type,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateOrGetRefType(
self,
obj: object,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateOrGetRefType(
self,
type_str: str,
module_name: Optional[str] = None
) -> RefType:
...
def CreateOrGetRefType(
self,
data,
module_name: Optional[str] = None
) -> RefType:
"""创建或获取RefType实例使用多级缓存提高性能"""
if data is None:
raise ReflectionException("data is None")
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
data = self._string_to_type_cache[data]
# 获取或转换为类型
metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name)
# 首先尝试从弱引用缓存中获取
type_id = id(metaType)
if type_id in self._weak_refs:
ref_type = self._weak_refs[type_id]()
if ref_type is not None:
return ref_type
else:
# 如果弱引用已被回收,则从字典中删除
del self._weak_refs[type_id]
# 然后尝试从常规缓存中获取
try:
ref_type = self._RefTypes[metaType]
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type
except KeyError:
ref_type = self.CreateRefType(metaType, module_name=module_name)
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
return ref_type
@overload
def CreateRefType(
self,
type_: type,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateRefType(
self,
obj: object,
module_name: Optional[str] = None
) -> RefType:
...
@overload
def CreateRefType(
self,
type_str: str,
module_name: Optional[str] = None
) -> RefType:
...
def CreateRefType(
self,
data,
module_name: Optional[str] = None
) -> RefType:
"""创建新的RefType实例"""
if data is None:
raise ReflectionException("data is None")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
data = self._string_to_type_cache[data]
metaType:type|_SpecialIndictaor = TypeManager._TurnToType(data, module_name=module_name)
# 如果是字符串类型,缓存结果以供将来使用
if isinstance(data, str) and not data in self._string_to_type_cache:
self._string_to_type_cache[data] = metaType
try:
ref_type = RefType(metaType)
if GetInternalReflectionDebug():
PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type
return ref_type
except Exception as e:
raise ReflectionException(f"Create RefType failed: {e}")
def CreateOrGetRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType:
"""快速路径直接从类型创建或获取RefType"""
if type_ in self._RefTypes:
return self._RefTypes[type_]
else:
return self.CreateRefType(type_)
def CreateRefTypeFromType(self, type_:type|_SpecialIndictaor) -> RefType:
"""直接从类型创建RefType"""
result = self._RefTypes[type_] = RefType(type_)
return result