Files

1154 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
from .Config import *
import json
import shutil
import os
import zipfile
import tarfile
import base64
import hashlib
import time
import datetime
import stat
from typing import *
from pathlib import Path
def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:]
def GetBaseFilename(file:str):
return os.path.basename(file)
dir_name_type = str
file_name_type = str
class FileOperationError(Exception):
"""文件操作异常基类"""
pass
class CompressionError(FileOperationError):
"""压缩操作异常"""
pass
class EncryptionError(FileOperationError):
"""加密操作异常"""
pass
class HashError(FileOperationError):
"""哈希计算异常"""
pass
class FileMonitorError(FileOperationError):
"""文件监控异常"""
pass
class BackupError(FileOperationError):
"""备份操作异常"""
pass
class PermissionError(FileOperationError):
"""权限操作异常"""
pass
try:
from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel):
OriginFullPath:str
def __init__(
self,
filePath: Union[str, Self],
):
filePath = os.path.expandvars(str(filePath))
if ":" in filePath:
filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath)
def __del__(self):
pass
def __str__(self):
return self.GetFullPath()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return True
def __or__(self, other):
if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else:
# 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze"
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other):
temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath()
def __eq__(self, other) -> bool:
"""
判断文件路径是否相等
注意字符串可能不同,因为文件夹路径后缀的斜线可能被忽略
Args:
other: 另一个文件对象或路径字符串
Returns:
是否相等
"""
if other is None:
return False
# 获取比较对象的路径
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
# 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True:
return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists():
return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else:
return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self):
return Path(self.OriginFullPath)
def __Path__(self):
return Path(self.OriginFullPath)
def Create(self):
if self.Exists() == False:
if self.IsDir():
if os.path.exists(self.GetDir()):
os.makedirs(self.OriginFullPath)
else:
raise FileNotFoundError(f"{self.OriginFullPath} cannt create, because its parent path is not exist")
else:
with open(self.OriginFullPath, 'w') as f:
f.write('')
return self
def Exists(self):
return os.path.exists(self.OriginFullPath)
def Remove(self):
if self.Exists():
if self.IsDir():
shutil.rmtree(self.OriginFullPath)
else:
os.remove(self.OriginFullPath)
return self
def Copy(self, targetPath:Optional[Union[Self, str]]=None):
if targetPath is None:
return ToolFile(self.OriginFullPath)
if self.Exists() == False:
raise FileNotFoundError("file not found")
target_file = ToolFile(str(targetPath))
if target_file.IsDir():
target_file = target_file|self.GetFilename()
shutil.copy(self.OriginFullPath, str(target_file))
return target_file
def Move(self, targetPath:Union[Self, str]):
if self.Exists() is False:
raise FileNotFoundError("file not found")
target_file = ToolFile(str(targetPath))
if target_file.IsDir():
target_file = target_file|self.GetFilename()
shutil.move(self.OriginFullPath, str(target_file))
self.OriginFullPath = target_file.OriginFullPath
return self
def Rename(self, newpath:Union[Self, str]):
if self.Exists() is False:
raise FileNotFoundError("file not found")
newpath = str(newpath)
if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path)
self.OriginFullPath = new_current_path
return self
def LoadAsJson(self, encoding:str='utf-8', **kwargs) -> Any:
with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs)
return json_data
def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f)
def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f)
def LoadAsBinary(self) -> bytes:
with open(self.OriginFullPath, 'rb') as f:
return f.read()
def LoadAsText(self) -> str:
with open(self.OriginFullPath, 'r') as f:
return f.read()
def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText()
def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data):
try:
from pydantic import BaseModel
if isinstance(json_data, BaseModel):
json_data = json_data.model_dump()
json_data["__type"] = f"{self.data.__class__.__name__}, pydantic.BaseModel"
except:
pass
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4)
return self
def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath)
return self
def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath)
return self
def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath)
return self
def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False)
return self
def SaveAsBinary(self, binary_data:bytes):
with open(self.OriginFullPath, 'wb') as f:
f.write(binary_data)
return self
def SaveAsText(self, text_data:str):
with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data)
return self
def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self
def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath)
return self
def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath)
return self
def SaveAsUnknown(self, unknown_data:Any):
self.SaveAsBinary(unknown_data)
def SaveAsModel(self, model:type[BaseModel]):
self.SaveAsJson(model)
def GetSize(self) -> int:
'''
return:
return size of directory
'''
return os.path.getsize(self.OriginFullPath)
def GetExtension(self):
return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str:
return self.OriginFullPath
def GetFilename(self, is_without_extension = False):
'''
if target path is a file, it return filename
if target path is a directory, it return top directory name
'''
if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1])
else:
return GetBaseFilename(self.OriginFullPath)
def GetDir(self):
return os.path.dirname(self.OriginFullPath)
def GetDirToolFile(self):
return ToolFile(self.GetDir())
def GetCurrentDirName(self):
return os.path.dirname(self.OriginFullPath)
def IsDir(self):
if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True
else:
return os.path.isdir(self.OriginFullPath)
def IsFile(self):
return os.path.isfile(self.OriginFullPath)
def TryCreateParentPath(self):
dir_path = os.path.dirname(self.OriginFullPath)
if dir_path == '':
return self
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return self
def DirIter(self):
return os.listdir(self.OriginFullPath)
def DirToolFileIter(self):
result = [self]
result.clear()
for file in os.listdir(self.OriginFullPath):
result.append(self|file)
return result
def BackToParentDir(self):
self.OriginFullPath = self.GetDir()
return self
def GetParentDir(self):
return ToolFile(self.GetDir())
def DirCount(self, ignore_folder:bool = True):
iter = self.DirIter()
result = 0
for content in iter:
if ignore_folder and os.path.isdir(os.path.join(self.OriginFullPath, content)):
continue
result += 1
return result
def DirClear(self):
for file in self.DirToolFileIter():
file.Remove()
return self
def FirstFileWithExtension(self, extension:str):
target_dir = self if self.IsDir() else ToolFile(self.GetDir())
for file in target_dir.DirToolFileIter():
if file.IsDir() is False and file.GetExtension() == extension:
return file
return None
def FirstFile(self, pr:Callable[[str], bool]):
target_dir = self if self.IsDir() else ToolFile(self.GetDir())
for file in target_dir.DirToolFileIter():
if pr(file.GetFilename()):
return file
return None
def FindFileWithExtension(self, extension:str):
target_dir = self if self.IsDir() else ToolFile(self.GetDir())
result:List[ToolFile] = []
for file in target_dir.DirToolFileIter():
if file.IsDir() is False and file.GetExtension() == extension:
result.append(file)
return result
def FindFile(self, pr:Callable[[str], bool]):
target_dir = self if self.IsDir() else ToolFile(self.GetDir())
result:List[ToolFile] = []
for file in target_dir.DirToolFileIter():
if pr(file.GetFilename()):
result.append(file)
return result
def DirWalk(
self,
top,
topdown: bool = True,
onerror: Optional[Callable] = None,
followlinks: bool = False
) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]:
return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks)
def bool(self):
return self.Exists()
def __bool__(self):
return self.Exists()
def MustExistsPath(self):
self.TryCreateParentPath()
self.Create()
return self
def MakeFileInside(self, data:Self, is_delete_source = False):
if self.IsDir() is False:
raise Exception("Cannot make file inside a file, because this object target is not a directory")
result:ToolFile = self|data.GetFilename()
if is_delete_source:
data.Move(result)
else:
data.Copy(result)
return self
def Compress(self, output_path: Optional[str] = None, format: str = 'zip') -> 'ToolFile':
"""
压缩文件或目录
Args:
output_path: 输出路径,如果为None则使用原文件名
format: 压缩格式,支持'zip''tar'
Returns:
压缩后的文件对象
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
if output_path is None:
output_path = self.GetFullPath() + ('.zip' if format == 'zip' else '.tar')
try:
if format == 'zip':
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
if self.IsDir():
for root, _, files in os.walk(self.GetFullPath()):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, self.GetFullPath())
zipf.write(file_path, arcname)
else:
zipf.write(self.GetFullPath(), self.GetFilename())
elif format == 'tar':
with tarfile.open(output_path, 'w') as tarf:
if self.IsDir():
tarf.add(self.GetFullPath(), arcname=self.GetFilename())
else:
tarf.add(self.GetFullPath(), arcname=self.GetFilename())
else:
raise CompressionError(f"Unsupported compression format: {format}")
return ToolFile(output_path)
except Exception as e:
raise CompressionError(f"Compression failed: {str(e)}")
def Decompress(self, output_path: Optional[str] = None) -> 'ToolFile':
"""
解压文件
Args:
output_path: 输出目录,如果为None则使用原文件名
Returns:
解压后的目录对象
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
if output_path is None:
output_path = self.GetFullPath() + '_extracted'
try:
if self.GetExtension() == 'zip':
with zipfile.ZipFile(self.GetFullPath(), 'r') as zipf:
zipf.extractall(output_path)
elif self.GetExtension() == 'tar':
with tarfile.open(self.GetFullPath(), 'r') as tarf:
tarf.extractall(output_path)
else:
raise CompressionError(f"Unsupported archive format: {self.GetExtension()}")
return ToolFile(output_path)
except Exception as e:
raise CompressionError(f"Decompression failed: {str(e)}")
def Encrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile':
"""
加密文件
Args:
key: 加密密钥
algorithm: 加密算法,目前支持'AES'
Returns:
加密后的文件对象
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 生成加密密钥
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(key.encode()))
# 创建加密器
f = Fernet(key)
# 读取文件内容
with open(self.GetFullPath(), 'rb') as file:
file_data = file.read()
# 加密数据
encrypted_data = f.encrypt(file_data)
# 保存加密后的文件
encrypted_path = self.GetFullPath() + '.encrypted'
with open(encrypted_path, 'wb') as file:
file.write(salt + encrypted_data)
return ToolFile(encrypted_path)
except Exception as e:
raise EncryptionError(f"Encryption failed: {str(e)}")
def decrypt(self, key: str, algorithm: str = 'AES') -> Self:
"""
解密文件
Args:
key: 解密密钥
algorithm: 解密算法,目前支持'AES'
Returns:
解密后的文件对象
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 读取加密文件
with open(self.GetFullPath(), 'rb') as file:
file_data = file.read()
# 提取salt和加密数据
salt = file_data[:16]
encrypted_data = file_data[16:]
# 生成解密密钥
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(key.encode()))
# 创建解密器
f = Fernet(key)
# 解密数据
decrypted_data = f.decrypt(encrypted_data)
# 保存解密后的文件
decrypted_path = self.GetFullPath() + '.decrypted'
with open(decrypted_path, 'wb') as file:
file.write(decrypted_data)
return ToolFile(decrypted_path)
except Exception as e:
raise EncryptionError(f"Decryption failed: {str(e)}")
def calculate_hash(self, algorithm: str = 'md5', chunk_size: int = 8192) -> str:
"""
计算文件的哈希值
Args:
algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'
chunk_size: 每次读取的字节数
Returns:
文件的哈希值(十六进制字符串)
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 获取哈希算法
hash_algo = getattr(hashlib, algorithm.lower())
if not hash_algo:
raise HashError(f"Unsupported hash algorithm: {algorithm}")
# 创建哈希对象
hasher = hash_algo()
# 分块读取文件并更新哈希值
with open(self.GetFullPath(), 'rb') as f:
while chunk := f.read(chunk_size):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
raise HashError(f"Hash calculation failed: {str(e)}")
def verify_hash(self, expected_hash: str, algorithm: str = 'md5') -> bool:
"""
验证文件哈希值
Args:
expected_hash: 期望的哈希值
algorithm: 哈希算法,支持'md5', 'sha1', 'sha256', 'sha512'
Returns:
是否匹配
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
actual_hash = self.calculate_hash(algorithm)
return actual_hash.lower() == expected_hash.lower()
except Exception as e:
raise HashError(f"Hash verification failed: {str(e)}")
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self:
"""
保存文件的哈希值到文件
Args:
algorithm: 哈希算法
output_path: 输出文件路径,如果为None则使用原文件名
Returns:
哈希值文件对象
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 计算哈希值
hash_value = self.calculate_hash(algorithm)
# 生成输出路径
if output_path is None:
output_path = self.GetFullPath() + f'.{algorithm}'
# 保存哈希值
with open(output_path, 'w') as f:
f.write(f"{hash_value} *{self.GetFilename()}")
return ToolFile(output_path)
except Exception as e:
raise HashError(f"Hash saving failed: {str(e)}")
def start_monitoring(
self,
callback: Callable[[str, str], None],
recursive: bool = False,
ignore_patterns: Optional[List[str]] = None,
ignore_directories: bool = False,
case_sensitive: bool = True,
is_log: bool = True
) -> None:
"""
开始监控文件或目录的变化
Args:
callback: 回调函数,接收事件类型和路径两个参数
recursive: 是否递归监控子目录
ignore_patterns: 忽略的文件模式列表
ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写
"""
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
class EventHandler(FileSystemEventHandler):
def __init__(self, callback, ignore_patterns, ignore_directories, case_sensitive):
self.callback = callback
self.ignore_patterns = ignore_patterns or []
self.ignore_directories = ignore_directories
self.case_sensitive = case_sensitive
def should_ignore(self, path: str) -> bool:
if self.ignore_directories and os.path.isdir(path):
return True
if not self.case_sensitive:
path = path.lower()
return any(pattern in path for pattern in self.ignore_patterns)
def on_created(self, event):
if not self.should_ignore(event.src_path):
self.callback('created', event.src_path)
def on_modified(self, event):
if not self.should_ignore(event.src_path):
self.callback('modified', event.src_path)
def on_deleted(self, event):
if not self.should_ignore(event.src_path):
self.callback('deleted', event.src_path)
def on_moved(self, event):
if not self.should_ignore(event.src_path):
self.callback('moved', f"{event.src_path} -> {event.dest_path}")
# 创建事件处理器
event_handler = EventHandler(
callback=callback,
ignore_patterns=ignore_patterns,
ignore_directories=ignore_directories,
case_sensitive=case_sensitive
)
# 创建观察者
observer = Observer()
observer.schedule(event_handler, self.GetFullPath(), recursive=recursive)
# 启动监控
observer.start()
if is_log:
print(f"Started monitoring {self.GetFullPath()}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
if is_log:
print("Stopped monitoring")
observer.join()
except Exception as e:
raise FileMonitorError(f"Failed to start monitoring: {str(e)}")
def create_backup(
self,
backup_dir: Optional[str] = None,
max_backups: int = 5,
backup_format: str = 'zip',
include_metadata: bool = True
) -> Self:
"""
创建文件或目录的备份
Args:
backup_dir: 备份目录,如果为None则使用原目录下的.backup目录
max_backups: 最大保留备份数量
backup_format: 备份格式,支持'zip''tar'
include_metadata: 是否包含元数据
Returns:
备份文件对象
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 生成备份目录
if backup_dir is None:
backup_dir = os.path.join(self.GetDir(), '.backup')
backup_dir:Self = ToolFile(backup_dir)
backup_dir.MustExistsPath()
# 生成备份文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"{self.GetFilename()}_{timestamp}"
# 创建备份
if backup_format == 'zip':
backup_path = backup_dir | f"{backup_name}.zip"
with zipfile.ZipFile(backup_path.GetFullPath(), 'w', zipfile.ZIP_DEFLATED) as zipf:
if self.IsDir():
for root, _, files in os.walk(self.GetFullPath()):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, self.GetFullPath())
zipf.write(file_path, arcname)
else:
zipf.write(self.GetFullPath(), self.GetFilename())
elif backup_format == 'tar':
backup_path = backup_dir | f"{backup_name}.tar"
with tarfile.open(backup_path.GetFullPath(), 'w') as tarf:
if self.IsDir():
tarf.add(self.GetFullPath(), arcname=self.GetFilename())
else:
tarf.add(self.GetFullPath(), arcname=self.GetFilename())
else:
raise BackupError(f"Unsupported backup format: {backup_format}")
# 添加元数据
if include_metadata:
metadata = {
'original_path': self.GetFullPath(),
'backup_time': timestamp,
'file_size': self.get_size(),
'is_directory': self.IsDir(),
'hash': self.calculate_hash()
}
metadata_path = backup_dir | f"{backup_name}.meta.json"
with open(metadata_path.GetFullPath(), 'w') as f:
json.dump(metadata, f, indent=4)
# 清理旧备份
if max_backups > 0:
backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_'))
backups.sort(key=lambda f: f.GetFilename(), reverse=True)
for old_backup in backups[max_backups:]:
old_backup.Remove()
return backup_path
except Exception as e:
raise BackupError(f"Backup failed: {str(e)}")
def restore_backup(
self,
backup_file: Union[str, Self],
restore_path: Optional[str] = None,
verify_hash: bool = True
) -> Self:
"""
从备份恢复文件或目录
Args:
backup_file: 备份文件路径
restore_path: 恢复路径,如果为None则恢复到原位置
verify_hash: 是否验证哈希值
Returns:
恢复后的文件对象
"""
if not isinstance(backup_file, ToolFile):
backup_file:Self = ToolFile(backup_file)
if not backup_file.Exists():
raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}")
try:
# 确定恢复路径
if restore_path is None:
restore_path = self.GetFullPath()
restore_path:Self = ToolFile(restore_path)
# 解压备份
if backup_file.get_extension() == 'zip':
with zipfile.ZipFile(backup_file.GetFullPath(), 'r') as zipf:
zipf.extractall(restore_path.GetFullPath())
elif backup_file.get_extension() == 'tar':
with tarfile.open(backup_file.GetFullPath(), 'r') as tarf:
tarf.extractall(restore_path.GetFullPath())
else:
raise BackupError(f"Unsupported backup format: {backup_file.get_extension()}")
# 验证哈希值
if verify_hash:
metadata_path = backup_file.GetFullPath()[:-len(backup_file.get_extension())-1] + '.meta.json'
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as f:
metadata = json.load(f)
restored_file = ToolFile(restore_path.GetFullPath())
if restored_file.calculate_hash() != metadata['hash']:
raise BackupError("Hash verification failed")
return restore_path
except Exception as e:
raise BackupError(f"Restore failed: {str(e)}")
def list_backups(self) -> List[Self]:
"""
列出所有备份
Returns:
备份文件列表
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup'))
if not backup_dir.Exists():
return []
backups = backup_dir.find_file(lambda f: ToolFile(f).GetFilename().startswith(self.GetFilename() + '_'))
backups.sort(key=lambda f: ToolFile(f).GetFilename(), reverse=True)
return backups
except Exception as e:
raise BackupError(f"Failed to list backups: {str(e)}")
def get_permissions(self) -> Dict[str, bool]:
"""
获取文件或目录的权限
Returns:
权限字典,包含以下键:
- read: 是否可读
- write: 是否可写
- execute: 是否可执行
- hidden: 是否隐藏
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
mode = os.stat(self.GetFullPath()).st_mode
return {
'read': bool(mode & stat.S_IRUSR),
'write': bool(mode & stat.S_IWUSR),
'execute': bool(mode & stat.S_IXUSR),
'hidden': bool(os.path.isfile(self.GetFullPath()) and self.GetFilename().startswith('.'))
}
except Exception as e:
raise PermissionError(f"Failed to get permissions: {str(e)}")
def set_permissions(
self,
read: Optional[bool] = None,
write: Optional[bool] = None,
execute: Optional[bool] = None,
hidden: Optional[bool] = None,
recursive: bool = False
) -> Self:
"""
设置文件或目录的权限
Args:
read: 是否可读
write: 是否可写
execute: 是否可执行
hidden: 是否隐藏
recursive: 是否递归设置目录权限
Returns:
文件对象本身
"""
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try:
# 获取当前权限
current_perms = os.stat(self.GetFullPath()).st_mode
# 设置新权限
if read is not None:
if read:
current_perms |= stat.S_IRUSR
else:
current_perms &= ~stat.S_IRUSR
if write is not None:
if write:
current_perms |= stat.S_IWUSR
else:
current_perms &= ~stat.S_IWUSR
if execute is not None:
if execute:
current_perms |= stat.S_IXUSR
else:
current_perms &= ~stat.S_IXUSR
# 应用权限
os.chmod(self.GetFullPath(), current_perms)
# 设置隐藏属性
if hidden is not None:
if os.name == 'nt': # Windows
import ctypes
if hidden:
ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 2)
else:
ctypes.windll.kernel32.SetFileAttributesW(self.GetFullPath(), 0)
else: # Unix/Linux/Mac
if hidden:
if not self.GetFilename().startswith('.'):
self.Rename('.' + self.GetFilename())
else:
if self.GetFilename().startswith('.'):
self.Rename(self.GetFilename()[1:])
# 递归设置目录权限
if recursive and self.IsDir():
for root, _, files in os.walk(self.GetFullPath()):
for file in files:
file_path = os.path.join(root, file)
if read is not None:
if read:
os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IRUSR)
else:
os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IRUSR)
if write is not None:
if write:
os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IWUSR)
else:
os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IWUSR)
if execute is not None:
if execute:
os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR)
else:
os.chmod(file_path, os.stat(file_path).st_mode & ~stat.S_IXUSR)
return self
except Exception as e:
raise PermissionError(f"Failed to set permissions: {str(e)}")
def is_readable(self) -> bool:
"""
检查文件是否可读
Returns:
是否可读
"""
return self.get_permissions()['read']
def is_writable(self) -> bool:
"""
检查文件是否可写
Returns:
是否可写
"""
return self.get_permissions()['write']
def is_executable(self) -> bool:
"""
检查文件是否可执行
Returns:
是否可执行
"""
return self.get_permissions()['execute']
def is_hidden(self) -> bool:
"""
检查文件是否隐藏
Returns:
是否隐藏
"""
return self.get_permissions()['hidden']