from .Config import * import json import shutil import pandas as pd import os import sys import pickle import zipfile import tarfile import base64 import hashlib import time import datetime import stat from typing import * from pathlib import Path try: from pydub import AudioSegment except ImportError as e: ImportingThrow(e, "File", ["pydub"]) try: from PIL import Image, ImageFile except ImportError as e: ImportingThrow(e, "File", ["Pillow"]) try: from docx import Document from docx.document import Document as DocumentObject except ImportError as e: ImportingThrow(e, "File", ["python-docx"]) from .String import Bytes2String def GetExtensionName(file:str): return os.path.splitext(file)[1][1:] def GetBaseFilename(file:str): return os.path.basename(file) dir_name_type = str file_name_type = str class FileOperationError(Exception): """文件操作异常基类""" pass class CompressionError(FileOperationError): """压缩操作异常""" pass class EncryptionError(FileOperationError): """加密操作异常""" pass class HashError(FileOperationError): """哈希计算异常""" pass class FileMonitorError(FileOperationError): """文件监控异常""" pass class BackupError(FileOperationError): """备份操作异常""" pass class PermissionError(FileOperationError): """权限操作异常""" pass from pydantic import BaseModel, GetCoreSchemaHandler, Field from pydantic_core import core_schema class ToolFile(BaseModel): OriginFullPath:str def __init__( self, filePath: Union[str, Self], ): filePath = os.path.expandvars(str(filePath)) if filePath[1:].startswith(":/") or filePath[1:].startswith(":\\"): filePath = os.path.abspath(filePath) super().__init__(OriginFullPath=filePath) def __del__(self): pass def __str__(self): return self.GetFullPath() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return True def __or__(self, other): if other is None: return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}\\") else: # 不使用os.path.join,因为os.path.join存在如下机制 # 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如: # os.path.join("E:/dev", "/analyze/") = "E:/analyze/" # 而我们需要的是 "E:/dev/analyze" first = self.GetFullPath().replace('/','\\').strip('\\') second = str(other).replace('/','\\') return ToolFile(f"{first}\\{second}") def __idiv__(self, other): temp = self.__or__(other) self.OriginFullPath = temp.GetFullPath() def __eq__(self, other) -> bool: """ 判断文件路径是否相等 注意字符串可能不同,因为文件夹路径后缀的斜线可能被忽略 Args: other: 另一个文件对象或路径字符串 Returns: 是否相等 """ if other is None: return False # 获取比较对象的路径 other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other) self_path = self.OriginFullPath # 如果两个文件都存在,则直接比较路径 if self.Exists() == True and other.Exists() == True: return self_path.strip('\\/') == other_path.strip('\\/') # 如果一个文件存在另一个不被判定为存在则一定不同 elif self.Exists() != other.Exists(): return False # 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串 else: return self_path.replace('/','\\') == other_path.replace('/','\\') def ToPath(self): return Path(self.OriginFullPath) def __Path__(self): return Path(self.OriginFullPath) def Create(self): if self.Exists() == False: if self.IsDir(): if os.path.exists(self.GetDir()): os.makedirs(self.OriginFullPath) else: raise FileNotFoundError(f"{self.OriginFullPath} cannt create, because its parent path is not exist") else: with open(self.OriginFullPath, 'w') as f: f.write('') return self def Exists(self): return os.path.exists(self.OriginFullPath) def Remove(self): if self.Exists(): if self.IsDir(): shutil.rmtree(self.OriginFullPath) else: os.remove(self.OriginFullPath) return self def Copy(self, targetPath:Optional[Union[Self, str]]=None): if targetPath is None: return ToolFile(self.OriginFullPath) if self.Exists() == False: raise FileNotFoundError("file not found") target_file = ToolFile(str(targetPath)) if target_file.IsDir(): target_file = target_file|self.GetFilename() shutil.copy(self.OriginFullPath, str(target_file)) return target_file def Move(self, targetPath:Union[Self, str]): if self.Exists() is False: raise FileNotFoundError("file not found") target_file = ToolFile(str(targetPath)) if target_file.IsDir(): target_file = target_file|self.GetFilename() shutil.move(self.OriginFullPath, str(target_file)) self.OriginFullPath = target_file.OriginFullPath return self def Rename(self, newpath:Union[Self, str]): if self.Exists() is False: raise FileNotFoundError("file not found") newpath = str(newpath) if '\\' in newpath or '/' in newpath: newpath = GetBaseFilename(newpath) new_current_path = os.path.join(self.GetDir(), newpath) os.rename(self.OriginFullPath, new_current_path) self.OriginFullPath = new_current_path return self def LoadAsJson(self, encoding:str='utf-8', **kwargs) -> Any: with open(self.OriginFullPath, 'r', encoding=encoding) as f: json_data = json.load(f, **kwargs) return json_data def LoadAsCsv(self) -> pd.DataFrame: with open(self.OriginFullPath, 'r') as f: return pd.read_csv(f) def LoadAsXml(self) -> pd.DataFrame: with open(self.OriginFullPath, 'r') as f: return pd.read_xml(f) def LoadAsDataframe(self) -> pd.DataFrame: with open(self.OriginFullPath, 'r') as f: return pd.read_csv(f) def LoadAsExcel(self) -> pd.DataFrame: with open(self.OriginFullPath, 'r') as f: return pd.read_excel(f) def LoadAsBinary(self) -> bytes: with open(self.OriginFullPath, 'rb') as f: return f.read() def LoadAsText(self) -> str: with open(self.OriginFullPath, 'r') as f: return f.read() def LoadAsWav(self): return AudioSegment.from_wav(self.OriginFullPath) def LoadAsAudio(self): return AudioSegment.from_file(self.OriginFullPath) def LoadAsImage(self) -> ImageFile.ImageFile: return Image.open(self.OriginFullPath) def LoadAsDocx(self) -> DocumentObject: return Document(self.OriginFullPath) def LoadAsUnknown(self, suffix:str) -> Any: return self.LoadAsText() def LoadAsModel(self, model:type[BaseModel]) -> BaseModel: return model.model_validate(self.LoadAsJson()) def SaveAsJson(self, json_data): try: from pydantic import BaseModel if isinstance(json_data, BaseModel): json_data = json_data.model_dump() json_data["__type"] = f"{self.data.__class__.__name__}, pydantic.BaseModel" except: pass with open(self.OriginFullPath, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=4) return self def SaveAsCsv(self, csv_data:pd.DataFrame): csv_data.to_csv(self.OriginFullPath) return self def SaveAsXml(self, xml_data:pd.DataFrame): xml_data.to_xml(self.OriginFullPath) return self def SaveAsDataframe(self, dataframe_data:pd.DataFrame): dataframe_data.to_csv(self.OriginFullPath) return self def SaveAsExcel(self, excel_data:pd.DataFrame): excel_data.to_excel(self.OriginFullPath, index=False) return self def SaveAsBinary(self, binary_data:bytes): with open(self.OriginFullPath, 'wb') as f: f.write(binary_data) return self def SaveAsText(self, text_data:str): with open(self.OriginFullPath, 'w') as f: f.writelines(text_data) return self def SaveAsAudio(self, audio_data:AudioSegment): audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath)) return self def SaveAsImage(self, image_data:ImageFile.ImageFile): image_data.save(self.OriginFullPath) return self def SaveAsDocx(self, docx_data:DocumentObject): docx_data.save(self.OriginFullPath) return self def SaveAsUnknown(self, unknown_data:Any): self.SaveAsBinary(unknown_data) def SaveAsModel(self, model:type[BaseModel]): self.SaveAsJson(model) def GetSize(self) -> int: ''' return: return size of directory ''' return os.path.getsize(self.OriginFullPath) def GetExtension(self): return GetExtensionName(self.OriginFullPath) def GetFullPath(self) -> str: return self.OriginFullPath def GetFilename(self, is_without_extension = False): ''' if target path is a file, it return filename if target path is a directory, it return top directory name ''' if is_without_extension and '.' in self.OriginFullPath: return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)] elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': return GetBaseFilename(self.OriginFullPath[:-1]) else: return GetBaseFilename(self.OriginFullPath) def GetDir(self): return os.path.dirname(self.OriginFullPath) def GetDirToolFile(self): return ToolFile(self.GetDir()) def GetCurrentDirName(self): return os.path.dirname(self.OriginFullPath) def IsDir(self): if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': return True else: return os.path.isdir(self.OriginFullPath) def IsFile(self): return os.path.isfile(self.OriginFullPath) def TryCreateParentPath(self): dir_path = os.path.dirname(self.OriginFullPath) if dir_path == '': return self if not os.path.exists(dir_path): os.makedirs(dir_path) return self def DirIter(self): return os.listdir(self.OriginFullPath) def DirToolFileIter(self): result = [self] result.clear() for file in os.listdir(self.OriginFullPath): result.append(self|file) return result def BackToParentDir(self): self.OriginFullPath = self.GetDir() return self def GetParentDir(self): return ToolFile(self.GetDir()) def DirCount(self, ignore_folder:bool = True): iter = self.DirIter() result = 0 for content in iter: if ignore_folder and os.path.isdir(os.path.join(self.OriginFullPath, content)): continue result += 1 return result def DirClear(self): for file in self.DirToolFileIter(): file.Remove() return self def FirstFileWithExtension(self, extension:str): target_dir = self if self.IsDir() else ToolFile(self.GetDir()) for file in target_dir.DirToolFileIter(): if file.IsDir() is False and file.GetExtension() == extension: return file return None def FirstFile(self, pr:Callable[[str], bool]): target_dir = self if self.IsDir() else ToolFile(self.GetDir()) for file in target_dir.DirToolFileIter(): if pr(file.GetFilename()): return file return None def FindFileWithExtension(self, extension:str): target_dir = self if self.IsDir() else ToolFile(self.GetDir()) result:List[ToolFile] = [] for file in target_dir.DirToolFileIter(): if file.IsDir() is False and file.GetExtension() == extension: result.append(file) return result def FindFile(self, pr:Callable[[str], bool]): target_dir = self if self.IsDir() else ToolFile(self.GetDir()) result:List[ToolFile] = [] for file in target_dir.DirToolFileIter(): if pr(file.GetFilename()): result.append(file) return result def DirWalk( self, top, topdown: bool = True, onerror: Optional[Callable] = None, followlinks: bool = False ) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]: return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks) def bool(self): return self.Exists() def __bool__(self): return self.Exists() def MustExistsPath(self): self.TryCreateParentPath() self.Create() return self def MakeFileInside(self, data:Self, is_delete_source = False): if self.IsDir() is False: raise Exception("Cannot make file inside a file, because this object target is not a directory") result:ToolFile = self|data.GetFilename() if is_delete_source: data.Move(result) else: data.Copy(result) return self def Compress(self, output_path: Optional[str] = None, format: str = 'zip') -> 'ToolFile': """ 压缩文件或目录 Args: output_path: 输出路径,如果为None则使用原文件名 format: 压缩格式,支持'zip'和'tar' Returns: 压缩后的文件对象 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") if output_path is None: output_path = self.GetFullPath() + ('.zip' if format == 'zip' else '.tar') try: if format == 'zip': with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: if self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, self.GetFullPath()) zipf.write(file_path, arcname) else: zipf.write(self.GetFullPath(), self.GetFilename()) elif format == 'tar': with tarfile.open(output_path, 'w') as tarf: if self.IsDir(): tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: raise CompressionError(f"Unsupported compression format: {format}") return ToolFile(output_path) except Exception as e: raise CompressionError(f"Compression failed: {str(e)}") def Decompress(self, output_path: Optional[str] = None) -> 'ToolFile': """ 解压文件 Args: output_path: 输出目录,如果为None则使用原文件名 Returns: 解压后的目录对象 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") if output_path is None: output_path = self.GetFullPath() + '_extracted' try: if self.GetExtension() == 'zip': with zipfile.ZipFile(self.GetFullPath(), 'r') as zipf: zipf.extractall(output_path) elif self.GetExtension() == 'tar': with tarfile.open(self.GetFullPath(), 'r') as tarf: tarf.extractall(output_path) else: raise CompressionError(f"Unsupported archive format: {self.GetExtension()}") return ToolFile(output_path) except Exception as e: raise CompressionError(f"Decompression failed: {str(e)}") def Encrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile': """ 加密文件 Args: key: 加密密钥 algorithm: 加密算法,目前支持'AES' Returns: 加密后的文件对象 """ from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 生成加密密钥 salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(key.encode())) # 创建加密器 f = Fernet(key) # 读取文件内容 with open(self.GetFullPath(), 'rb') as file: file_data = file.read() # 加密数据 encrypted_data = f.encrypt(file_data) # 保存加密后的文件 encrypted_path = self.GetFullPath() + '.encrypted' with open(encrypted_path, 'wb') as file: file.write(salt + encrypted_data) return ToolFile(encrypted_path) except Exception as e: raise EncryptionError(f"Encryption failed: {str(e)}") def decrypt(self, key: str, algorithm: str = 'AES') -> Self: """ 解密文件 Args: key: 解密密钥 algorithm: 解密算法,目前支持'AES' Returns: 解密后的文件对象 """ from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 读取加密文件 with open(self.GetFullPath(), 'rb') as file: file_data = file.read() # 提取salt和加密数据 salt = file_data[:16] encrypted_data = file_data[16:] # 生成解密密钥 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(key.encode())) # 创建解密器 f = Fernet(key) # 解密数据 decrypted_data = f.decrypt(encrypted_data) # 保存解密后的文件 decrypted_path = self.GetFullPath() + '.decrypted' with open(decrypted_path, 'wb') as file: file.write(decrypted_data) return ToolFile(decrypted_path) except Exception as e: raise EncryptionError(f"Decryption failed: {str(e)}") def calculate_hash(self, algorithm: str = 'md5', chunk_size: int = 8192) -> str: """ 计算文件的哈希值 Args: algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'等 chunk_size: 每次读取的字节数 Returns: 文件的哈希值(十六进制字符串) """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 获取哈希算法 hash_algo = getattr(hashlib, algorithm.lower()) if not hash_algo: raise HashError(f"Unsupported hash algorithm: {algorithm}") # 创建哈希对象 hasher = hash_algo() # 分块读取文件并更新哈希值 with open(self.GetFullPath(), 'rb') as f: while chunk := f.read(chunk_size): hasher.update(chunk) return hasher.hexdigest() except Exception as e: raise HashError(f"Hash calculation failed: {str(e)}") def verify_hash(self, expected_hash: str, algorithm: str = 'md5') -> bool: """ 验证文件哈希值 Args: expected_hash: 期望的哈希值 algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'等 Returns: 是否匹配 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: actual_hash = self.calculate_hash(algorithm) return actual_hash.lower() == expected_hash.lower() except Exception as e: raise HashError(f"Hash verification failed: {str(e)}") def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self: """ 保存文件的哈希值到文件 Args: algorithm: 哈希算法 output_path: 输出文件路径,如果为None则使用原文件名 Returns: 哈希值文件对象 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 计算哈希值 hash_value = self.calculate_hash(algorithm) # 生成输出路径 if output_path is None: output_path = self.GetFullPath() + f'.{algorithm}' # 保存哈希值 with open(output_path, 'w') as f: f.write(f"{hash_value} *{self.GetFilename()}") return ToolFile(output_path) except Exception as e: raise HashError(f"Hash saving failed: {str(e)}") def start_monitoring( self, callback: Callable[[str, str], None], recursive: bool = False, ignore_patterns: Optional[List[str]] = None, ignore_directories: bool = False, case_sensitive: bool = True, is_log: bool = True ) -> None: """ 开始监控文件或目录的变化 Args: callback: 回调函数,接收事件类型和路径两个参数 recursive: 是否递归监控子目录 ignore_patterns: 忽略的文件模式列表 ignore_directories: 是否忽略目录事件 case_sensitive: 是否区分大小写 """ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: class EventHandler(FileSystemEventHandler): def __init__(self, callback, ignore_patterns, ignore_directories, case_sensitive): self.callback = callback self.ignore_patterns = ignore_patterns or [] self.ignore_directories = ignore_directories self.case_sensitive = case_sensitive def should_ignore(self, path: str) -> bool: if self.ignore_directories and os.path.isdir(path): return True if not self.case_sensitive: path = path.lower() return any(pattern in path for pattern in self.ignore_patterns) def on_created(self, event): if not self.should_ignore(event.src_path): self.callback('created', event.src_path) def on_modified(self, event): if not self.should_ignore(event.src_path): self.callback('modified', event.src_path) def on_deleted(self, event): if not self.should_ignore(event.src_path): self.callback('deleted', event.src_path) def on_moved(self, event): if not self.should_ignore(event.src_path): self.callback('moved', f"{event.src_path} -> {event.dest_path}") # 创建事件处理器 event_handler = EventHandler( callback=callback, ignore_patterns=ignore_patterns, ignore_directories=ignore_directories, case_sensitive=case_sensitive ) # 创建观察者 observer = Observer() observer.schedule(event_handler, self.GetFullPath(), recursive=recursive) # 启动监控 observer.start() if is_log: print(f"Started monitoring {self.GetFullPath()}") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() if is_log: print("Stopped monitoring") observer.join() except Exception as e: raise FileMonitorError(f"Failed to start monitoring: {str(e)}") def create_backup( self, backup_dir: Optional[str] = None, max_backups: int = 5, backup_format: str = 'zip', include_metadata: bool = True ) -> Self: """ 创建文件或目录的备份 Args: backup_dir: 备份目录,如果为None则使用原目录下的.backup目录 max_backups: 最大保留备份数量 backup_format: 备份格式,支持'zip'和'tar' include_metadata: 是否包含元数据 Returns: 备份文件对象 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 生成备份目录 if backup_dir is None: backup_dir = os.path.join(self.GetDir(), '.backup') backup_dir:Self = ToolFile(backup_dir) backup_dir.MustExistsPath() # 生成备份文件名 timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f"{self.GetFilename()}_{timestamp}" # 创建备份 if backup_format == 'zip': backup_path = backup_dir | f"{backup_name}.zip" with zipfile.ZipFile(backup_path.GetFullPath(), 'w', zipfile.ZIP_DEFLATED) as zipf: if self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, self.GetFullPath()) zipf.write(file_path, arcname) else: zipf.write(self.GetFullPath(), self.GetFilename()) elif backup_format == 'tar': backup_path = backup_dir | f"{backup_name}.tar" with tarfile.open(backup_path.GetFullPath(), 'w') as tarf: if self.IsDir(): tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: tarf.add(self.GetFullPath(), arcname=self.GetFilename()) else: raise BackupError(f"Unsupported backup format: {backup_format}") # 添加元数据 if include_metadata: metadata = { 'original_path': self.GetFullPath(), 'backup_time': timestamp, 'file_size': self.get_size(), 'is_directory': self.IsDir(), 'hash': self.calculate_hash() } metadata_path = backup_dir | f"{backup_name}.meta.json" with open(metadata_path.GetFullPath(), 'w') as f: json.dump(metadata, f, indent=4) # 清理旧备份 if max_backups > 0: backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) backups.sort(key=lambda f: f.GetFilename(), reverse=True) for old_backup in backups[max_backups:]: old_backup.Remove() return backup_path except Exception as e: raise BackupError(f"Backup failed: {str(e)}") def restore_backup( self, backup_file: Union[str, Self], restore_path: Optional[str] = None, verify_hash: bool = True ) -> Self: """ 从备份恢复文件或目录 Args: backup_file: 备份文件路径 restore_path: 恢复路径,如果为None则恢复到原位置 verify_hash: 是否验证哈希值 Returns: 恢复后的文件对象 """ if not isinstance(backup_file, ToolFile): backup_file:Self = ToolFile(backup_file) if not backup_file.Exists(): raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}") try: # 确定恢复路径 if restore_path is None: restore_path = self.GetFullPath() restore_path:Self = ToolFile(restore_path) # 解压备份 if backup_file.get_extension() == 'zip': with zipfile.ZipFile(backup_file.GetFullPath(), 'r') as zipf: zipf.extractall(restore_path.GetFullPath()) elif backup_file.get_extension() == 'tar': with tarfile.open(backup_file.GetFullPath(), 'r') as tarf: tarf.extractall(restore_path.GetFullPath()) else: raise BackupError(f"Unsupported backup format: {backup_file.get_extension()}") # 验证哈希值 if verify_hash: metadata_path = backup_file.GetFullPath()[:-len(backup_file.get_extension())-1] + '.meta.json' if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: metadata = json.load(f) restored_file = ToolFile(restore_path.GetFullPath()) if restored_file.calculate_hash() != metadata['hash']: raise BackupError("Hash verification failed") return restore_path except Exception as e: raise BackupError(f"Restore failed: {str(e)}") def list_backups(self) -> List[Self]: """ 列出所有备份 Returns: 备份文件列表 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup')) if not backup_dir.Exists(): return [] backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_')) backups.sort(key=lambda f: ToolFile(f).GetFilename(), reverse=True) return backups except Exception as e: raise BackupError(f"Failed to list backups: {str(e)}") def get_permissions(self) -> Dict[str, bool]: """ 获取文件或目录的权限 Returns: 权限字典,包含以下键: - read: 是否可读 - write: 是否可写 - execute: 是否可执行 - hidden: 是否隐藏 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: mode = os.stat(self.GetFullPath()).st_mode return { 'read': bool(mode & stat.S_IRUSR), 'write': bool(mode & stat.S_IWUSR), 'execute': bool(mode & stat.S_IXUSR), 'hidden': bool(os.path.isfile(self.GetFullPath()) and self.GetFilename().startswith('.')) } except Exception as e: raise PermissionError(f"Failed to get permissions: {str(e)}") def set_permissions( self, read: Optional[bool] = None, write: Optional[bool] = None, execute: Optional[bool] = None, hidden: Optional[bool] = None, recursive: bool = False ) -> Self: """ 设置文件或目录的权限 Args: read: 是否可读 write: 是否可写 execute: 是否可执行 hidden: 是否隐藏 recursive: 是否递归设置目录权限 Returns: 文件对象本身 """ if not self.Exists(): raise FileNotFoundError(f"File not found: {self.GetFullPath()}") try: # 获取当前权限 current_perms = os.stat(self.GetFullPath()).st_mode # 设置新权限 if read is not None: if read: current_perms |= stat.S_IRUSR else: current_perms &= ~stat.S_IRUSR if write is not None: if write: current_perms |= stat.S_IWUSR else: current_perms &= ~stat.S_IWUSR if execute is not None: if execute: current_perms |= stat.S_IXUSR else: current_perms &= ~stat.S_IXUSR # 应用权限 os.chmod(self.GetFullPath(), current_perms) # 设置隐藏属性 if hidden is not None: if os.name == 'nt': # Windows import ctypes if hidden: ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 2) else: ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 0) else: # Unix/Linux/Mac if hidden: if not self.GetFilename().startswith('.'): self.Rename('.' + self.GetFilename()) else: if self.GetFilename().startswith('.'): self.Rename(self.GetFilename()[1:]) # 递归设置目录权限 if recursive and self.IsDir(): for root, _, files in os.walk(self.GetFullPath()): for file in files: file_path = os.path.join(root, file) if read is not None: if read: os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IRUSR) else: os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IRUSR) if write is not None: if write: os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IWUSR) else: os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IWUSR) if execute is not None: if execute: os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR) else: os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IXUSR) return self except Exception as e: raise PermissionError(f"Failed to set permissions: {str(e)}") def is_readable(self) -> bool: """ 检查文件是否可读 Returns: 是否可读 """ return self.get_permissions()['read'] def is_writable(self) -> bool: """ 检查文件是否可写 Returns: 是否可写 """ return self.get_permissions()['write'] def is_executable(self) -> bool: """ 检查文件是否可执行 Returns: 是否可执行 """ return self.get_permissions()['execute'] def is_hidden(self) -> bool: """ 检查文件是否隐藏 Returns: 是否隐藏 """ return self.get_permissions()['hidden']