Python实用工具脚本汇总

Python实用工具脚本汇总
寒霜Python实用工具脚本汇总
在日常开发中,我们经常需要编写一些自动化脚本来提高工作效率。本文汇总了几个实用的Python工具脚本,包括文件解压、Git仓库管理等。
1. 递归解压压缩包工具
1.1 功能特点
- 支持多种压缩格式:ZIP、TAR、TAR.GZ、TAR.BZ2、GZ
- 递归解压嵌套的压缩包
- 自动生成解压目录
- 支持对特定文件类型(如.log)的特殊处理
1.2 完整代码实现
import os
import zipfile
import tarfile
import gzip
import shutil
from pathlib import Path
def ensure_dir_exists(directory):
"""确保目录存在,不存在则创建"""
if not os.path.exists(directory):
os.makedirs(directory)
def get_extract_path(file_path):
"""根据压缩包名称生成解压目录路径"""
extract_dir = os.path.splitext(os.path.basename(file_path))[0]
extract_to = os.path.join(os.path.dirname(file_path), extract_dir)
return extract_to
def unzip_file(file_path, extract_to, handle_log=False):
"""
解压.zip文件到指定目录下
:param file_path: 压缩文件路径
:param extract_to: 解压目标目录
:param handle_log: 是否对.log文件进行特殊处理
"""
ensure_dir_exists(extract_to)
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for member in zip_ref.infolist():
if handle_log and member.filename.endswith('.log'):
# 将 .log 文件直接复制到目标目录
log_path = os.path.join(extract_to, member.filename)
ensure_dir_exists(os.path.dirname(log_path))
with zip_ref.open(member) as source, open(log_path, "wb") as target:
shutil.copyfileobj(source, target)
print(f'Copied .log file: {member.filename} to {log_path}')
else:
zip_ref.extract(member, extract_to)
print(f'Extracted: {member.filename}')
def untar_file(file_path, extract_to, handle_log=False):
"""
解压.tar, .tar.gz, .tar.bz2文件到指定目录下
:param file_path: 压缩文件路径
:param extract_to: 解压目标目录
:param handle_log: 是否对.log文件进行特殊处理
"""
ensure_dir_exists(extract_to)
mode = 'r'
if file_path.endswith('.gz'):
mode = 'r:gz'
elif file_path.endswith('.bz2'):
mode = 'r:bz2'
with tarfile.open(file_path, mode) as tar_ref:
for member in tar_ref.getmembers():
if handle_log and member.name.endswith('.log'):
# 将 .log 文件直接复制到目标目录
log_path = os.path.join(extract_to, member.name)
ensure_dir_exists(os.path.dirname(log_path))
with tar_ref.extractfile(member) as source, open(log_path, "wb") as target:
shutil.copyfileobj(source, target)
print(f'Copied .log file: {member.name} to {log_path}')
else:
tar_ref.extract(member, extract_to)
print(f'Extracted: {member.name}')
def extract_gz_file(file_path, extract_to):
"""
解压单独的.gz文件
:param file_path: 压缩文件路径
:param extract_to: 解压目标目录
"""
output_name = Path(file_path).stem # 自动去除.gz后缀
output_path = os.path.join(extract_to, output_name)
ensure_dir_exists(extract_to)
with gzip.open(file_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.write(f_in.read())
print(f"解压GZ文件: {file_path} -> {output_path}")
return output_path
def extract_nested_archives(archive_path, extract_to=None, processed=None, handle_log=False):
"""
递归解压嵌套的压缩文件
:param archive_path: 要解压的压缩文件路径
:param extract_to: 解压目录
:param processed: 已处理的文件集合(防止重复处理)
:param handle_log: 是否对.log文件进行特殊处理
"""
if processed is None:
processed = set()
archive_path = os.path.abspath(archive_path)
if archive_path in processed:
return
processed.add(archive_path)
if extract_to is None:
extract_to = os.path.join(
os.path.dirname(archive_path),
Path(archive_path).stem + "_extracted"
)
ensure_dir_exists(extract_to)
try:
# 处理ZIP文件
if zipfile.is_zipfile(archive_path):
unzip_file(archive_path, extract_to, handle_log)
# 处理TAR文件(包括tar.gz/tar.bz2)
elif tarfile.is_tarfile(archive_path):
untar_file(archive_path, extract_to, handle_log)
# 处理单独.gz文件
elif archive_path.endswith('.gz') and not tarfile.is_tarfile(archive_path):
output_path = extract_gz_file(archive_path, extract_to)
# 递归处理新解压的文件
extract_nested_archives(output_path, processed=processed, handle_log=handle_log)
else:
print(f"不支持的文件格式: {archive_path}")
return
# 递归处理解压目录中的所有文件
for root, _, files in os.walk(extract_to):
for file in files:
file_path = os.path.join(root, file)
if (file_path.endswith(('.zip', '.tar', '.tar.gz', '.tar.bz2', '.gz')) and
file_path not in processed):
extract_nested_archives(file_path, processed=processed, handle_log=handle_log)
except Exception as e:
print(f"解压失败: {archive_path} - {str(e)}")
def extract_all_in_archive(file_path, extract_to=None, handle_log=False):
"""
递归解压压缩包内的所有压缩包
:param file_path: 压缩包路径
:param extract_to: 解压目录
:param handle_log: 是否对.log文件进行特殊处理
"""
if extract_to is None:
extract_to = get_extract_path(file_path)
# 初次解压
if os.path.isfile(file_path):
extract_nested_archives(file_path, extract_to, handle_log=handle_log)
if __name__ == "__main__":
# 交互式使用
print("=== Python递归解压工具 ===")
print("功能:支持ZIP、TAR、TAR.GZ、TAR.BZ2、GZ格式的递归解压")
file_path = input("请输入要解压的文件路径: ").strip().strip('"')
if not os.path.isfile(file_path):
print(f"错误:文件不存在 - {file_path}")
else:
handle_log = input("是否对.log文件进行特殊处理?(y/n): ").strip().lower() == 'y'
print("\n开始解压...")
extract_all_in_archive(file_path, handle_log=handle_log)
print("解压完成!")
# 打印解压目录
extract_dir = get_extract_path(file_path)
print(f"解压目录: {os.path.abspath(extract_dir)}")
1.3 使用示例
# 示例1:简单解压
extract_all_in_archive(r'C:\Users\example\archive.zip')
# 示例2:解压并处理.log文件
extract_all_in_archive(r'C:\Users\example\archive.zip', handle_log=True)
# 示例3:指定解压目录
extract_nested_archives(
r'C:\Users\example\archive.zip',
extract_to=r'C:\Users\example\output'
)
1.4 支持的格式
| 格式 | 说明 | 示例 |
|---|---|---|
| ZIP | 常用压缩格式 | archive.zip |
| TAR | Linux常用格式 | archive.tar |
| TAR.GZ | Gzip压缩的TAR | archive.tar.gz |
| TAR.BZ2 | Bzip2压缩的TAR | archive.tar.bz2 |
| GZ | 单个Gzip压缩文件 | file.txt.gz |
2. Git仓库状态检查工具
2.1 功能特点
- 遍历指定目录下的所有Git仓库
- 检查每个仓库是否有未提交的更改
- 批量检查多个项目的工作状态
2.2 完整代码实现
import os
import subprocess
from pathlib import Path
def check_git_status(folder):
"""
检查Git仓库的状态
:param folder: Git仓库路径
:return: 未提交的更改信息
"""
try:
# 执行`git status --porcelain`命令
# 如果所有的更改都已提交,则输出应为空
result = subprocess.run(
['git', 'status', '--porcelain'],
cwd=folder,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except Exception as e:
return f"Error: {str(e)}"
def get_git_branch(folder):
"""
获取当前Git分支
:param folder: Git仓库路径
:return: 当前分支名
"""
try:
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=folder,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except:
return "unknown"
def get_git_remote_url(folder):
"""
获取Git远程仓库URL
:param folder: Git仓库路径
:return: 远程仓库URL
"""
try:
result = subprocess.run(
['git', 'config', '--get', 'remote.origin.url'],
cwd=folder,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except:
return "unknown"
def walk_and_check(folder, show_clean=False):
"""
遍历目录并检查所有Git仓库的状态
:param folder: 要遍历的根目录
:param show_clean: 是否显示已提交的仓库
"""
print(f"\n开始扫描目录: {folder}")
print("=" * 80)
repo_count = 0
uncommitted_count = 0
for root, dirs, _ in os.walk(folder):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
# 检查文件夹是否包含.git目录
if os.path.exists(os.path.join(dir_path, '.git')):
repo_count += 1
status = check_git_status(dir_path)
branch = get_git_branch(dir_path)
if status:
# 有未提交的更改
uncommitted_count += 1
print(f"\n❌ 未提交的更改: {dir_path}")
print(f" 分支: {branch}")
# 格式化输出更改内容
for line in status.split('\n'):
if line:
status_code = line[:2]
file_path = line[3:]
status_symbol = {
'M': '修改',
'A': '新增',
'D': '删除',
'R': '重命名',
'C': '复制',
'??': '未跟踪'
}
print(f" [{status_code}] {status_symbol.get(status_code, '未知')}: {file_path}")
elif show_clean:
# 所有更改都已提交
remote_url = get_git_remote_url(dir_path)
print(f"\n✅ 已提交: {dir_path}")
print(f" 分支: {branch}")
print(f" 远程: {remote_url}")
print("\n" + "=" * 80)
print(f"扫描完成!")
print(f"总共找到 {repo_count} 个Git仓库")
print(f"其中 {uncommitted_count} 个仓库有未提交的更改")
if __name__ == "__main__":
print("=== Git仓库状态批量检查工具 ===\n")
# 默认目录
default_folder = r'E:\Projects'
# 获取用户输入
folder = input(f"请输入要扫描的目录 (默认: {default_folder}): ").strip()
if not folder:
folder = default_folder
if not os.path.exists(folder):
print(f"错误:目录不存在 - {folder}")
else:
show_clean = input("是否显示已提交的仓库?(y/n): ").strip().lower() == 'y'
walk_and_check(folder, show_clean=show_clean)
2.3 使用示例
# 示例1:检查指定目录
walk_and_check('E:\\Projects')
# 示例2:检查并显示已提交的仓库
walk_and_check('E:\\Projects', show_clean=True)
# 示例3:检查单个仓库
status = check_git_status('E:\\Projects\\my-project')
if status:
print("有未提交的更改")
else:
print("所有更改已提交")
2.4 输出示例
开始扫描目录: E:\Projects
================================================================================
❌ 未提交的更改: E:\Projects\project-a
分支: feature/new-feature
[ M ] 修改: src/main.py
[A ] 新增: src/utils.py
[??] 未跟踪: config.ini
❌ 未提交的更改: E:\Projects\project-b
分支: main
[ D ] 删除: old_file.py
================================================================================
扫描完成!
总共找到 15 个Git仓库
其中 2 个仓库有未提交的更改
3. 批量重命名工具
3.1 功能特点
- 批量重命名文件
- 支持正则表达式匹配
- 支持递归处理子目录
3.2 代码实现
import os
import re
from pathlib import Path
def batch_rename_files(directory, pattern, replacement, recursive=False, dry_run=True):
"""
批量重命名文件
:param directory: 目录路径
:param pattern: 正则表达式模式
:param replacement: 替换字符串
:param recursive: 是否递归处理子目录
:param dry_run: 是否为试运行(不实际重命名)
"""
count = 0
if recursive:
# 递归遍历
for root, _, files in os.walk(directory):
for filename in files:
if re.search(pattern, filename):
old_path = os.path.join(root, filename)
new_filename = re.sub(pattern, replacement, filename)
new_path = os.path.join(root, new_filename)
if dry_run:
print(f"[试运行] {filename} -> {new_filename}")
else:
os.rename(old_path, new_path)
print(f"[重命名] {filename} -> {new_filename}")
count += 1
else:
# 只处理当前目录
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and re.search(pattern, filename):
new_filename = re.sub(pattern, replacement, filename)
new_path = os.path.join(directory, new_filename)
if dry_run:
print(f"[试运行] {filename} -> {new_filename}")
else:
os.rename(file_path, new_path)
print(f"[重命名] {filename} -> {new_filename}")
count += 1
if dry_run:
print(f"\n试运行完成:将重命名 {count} 个文件")
confirm = input("确认执行重命名?(y/n): ").strip().lower()
if confirm == 'y':
batch_rename_files(directory, pattern, replacement, recursive, dry_run=False)
else:
print(f"\n重命名完成:共处理 {count} 个文件")
# 使用示例
if __name__ == "__main__":
# 示例:将所有空格替换为下划线
batch_rename_files(
directory='./files',
pattern=r'\s+',
replacement='_',
recursive=True,
dry_run=True
)
4. 文件去重工具
4.1 功能特点
- 根据文件内容(MD5)检测重复文件
- 支持按文件大小筛选
- 生成重复文件报告
4.2 代码实现
import os
import hashlib
from collections import defaultdict
def calculate_md5(filepath):
"""计算文件的MD5值"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def find_duplicates(directory, min_size=0):
"""
查找重复文件
:param directory: 要扫描的目录
:param min_size: 最小文件大小(字节)
:return: 重复文件字典 {md5: [file_list]}
"""
files_by_md5 = defaultdict(list)
for root, _, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
filesize = os.path.getsize(filepath)
# 跳过小于指定大小的文件
if filesize < min_size:
continue
try:
file_md5 = calculate_md5(filepath)
files_by_md5[file_md5].append(filepath)
except Exception as e:
print(f"Error processing {filepath}: {e}")
# 只返回有重复的文件
duplicates = {
md5: files for md5, files in files_by_md5.items() if len(files) > 1
}
return duplicates
if __name__ == "__main__":
directory = input("请输入要扫描的目录: ").strip()
min_size = int(input("请输入最小文件大小(字节,0=不限制): ") or "0")
print(f"\n正在扫描 {directory}...")
duplicates = find_duplicates(directory, min_size)
if not duplicates:
print("未发现重复文件")
else:
total_duplicates = sum(len(files) - 1 for files in duplicates.values())
print(f"\n发现 {len(duplicates)} 组重复文件,共 {total_duplicates} 个重复:\n")
for md5, files in duplicates.items():
print(f"MD5: {md5}")
for file in files:
size = os.path.getsize(file)
print(f" - {file} ({size} bytes)")
print()
5. 总结
本文汇总了4个实用的Python工具脚本:
- 递归解压工具 - 处理各种格式的压缩包
- Git状态检查工具 - 批量检查多个仓库的状态
- 批量重命名工具 - 批量重命名文件
- 文件去重工具 - 查找重复文件
这些脚本可以直接使用,也可以根据实际需求进行修改。Python的强大之处在于能够快速实现各种自动化任务,大大提高工作效率。