from PIL import Image, ImageDraw, ImageFont
import os
import yaml
import shutil
# ==============================================================================
# 以下是前面章节中讲解的各项加水印功能函数,请确保它们都已复制到这里
# ==============================================================================
def _apply_single_watermark(img_obj, watermark_config, img_path):
"""
根据单个水印规则给图片对象添加水印。
:param img_obj: PIL图片对象
:param watermark_config: 单个水印规则字典
:param img_path: 原始图片路径 (用于日志和错误处理)
"""
w_type = watermark_config.get("type")
if not watermark_config.get("enabled", True): # 检查是否启用
return img_obj # 未启用,返回原图
if w_type == "text":
text_content = watermark_config.get("text", "")
font_path = watermark_config.get("font_path", "arial.ttf")
font_size = watermark_config.get("font_size", 30)
text_color = tuple(watermark_config.get("text_color", [0, 0, 0, 128]))
position = watermark_config.get("position", "bottom_right")
draw = ImageDraw.Draw(img_obj)
try:
font = ImageFont.truetype(font_path, font_size)
except IOError:
print(f"⚠️ 字体文件 '{font_path}' 未找到,使用默认字体。")
font = ImageFont.load_default()
# 计算文字位置
bbox = draw.textbbox((0, 0), text_content, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x, y = 0, 0
if position == "top_left": x, y = 10, 10
elif position == "top_right": x, y = img_obj.width - text_width - 10, 10
elif position == "bottom_left": x, y = 10, img_obj.height - text_height - 10
elif position == "bottom_right": x, y = img_obj.width - text_width - 10, img_obj.height - text_height - 10
elif position == "center": x, y = (img_obj.width - text_width) // 2, (img_obj.height - text_height) // 2
draw.text((x, y), text_content, font=font, fill=text_color)
print(f" - 添加文字水印: '{text_content}'")
return img_obj # 返回处理后的图片对象
elif w_type == "image":
watermark_img_path = watermark_config.get("watermark_image_path")
position = watermark_config.get("position", "bottom_right")
opacity = watermark_config.get("opacity", 0.5)
size_ratio = watermark_config.get("size_ratio", 0.15)
if not os.path.exists(watermark_img_path):
print(f"❌ 水印图片文件不存在:{watermark_img_path}")
return img_obj
try:
watermark_obj = Image.open(watermark_img_path).convert('RGBA')
# 调整水印大小
if position == "tile": # 平铺水印
resized_watermark = watermark_obj.resize((int(watermark_obj.width * size_ratio / 5), int(watermark_obj.height * size_ratio / 5)), Image.LANCZOS)
else: # 单次水印
resized_watermark = watermark_obj.resize((int(img_obj.width * size_ratio), int(img_obj.height * size_ratio)), Image.LANCZOS)
# 调整水印透明度
alpha = resized_watermark.split()[-1]
alpha = Image.eval(alpha, lambda x: x * opacity)
resized_watermark.putalpha(alpha)
if position == "tile":
for i in range(0, img_obj.width + resized_watermark.width, resized_watermark.width + 50):
for j in range(0, img_obj.height + resized_watermark.height, resized_watermark.height + 50):
img_obj.alpha_composite(resized_watermark, (i, j))
else:
x, y = 0, 0 # 计算单次水印位置
if position == "top_left": x, y = 10, 10
elif position == "top_right": x, y = img_obj.width - resized_watermark.width - 10, 10
elif position == "bottom_left": x, y = 10, img_obj.height - resized_watermark.height - 10
elif position == "bottom_right": x, y = img_obj.width - resized_watermark.width - 10, img_obj.height - resized_watermark.height - 10
elif position == "center": x, y = (img_obj.width - resized_watermark.width) // 2, (img_obj.height - resized_watermark.height) // 2
img_obj.alpha_composite(resized_watermark, (x, y))
print(f" - 添加图片水印: '{os.path.basename(watermark_img_path)}' ({position})")
return img_obj
except Exception as e:
print(f"❌ 添加图片水印失败:{e}")
return img_obj
return img_obj
def run_watermark_processor(config_path):
"""
运行图片水印处理器,根据配置文件批量添加水印。
这是你的“图片批量加水印神器”的核心。
:param config_path: 配置文件的路径
"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
print(f"❌ 配置文件未找到:{config_path}")
return
except yaml.YAMLError as e:
print(f"❌ 配置文件解析错误:{e}")
return
source_dir = os.path.expanduser(config.get("source_directory"))
output_dir = os.path.expanduser(config.get("output_directory"))
if not os.path.exists(source_dir):
print(f"❌ 源文件夹不存在:{source_dir}")
return
os.makedirs(output_dir, exist_ok=True)
print(f"\n🚀 正在启动图片水印处理器,源目录:'{source_dir}'")
print(f"📦 处理后的图片将保存到:'{output_dir}'\n")
files_to_process = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))]
if not files_to_process:
print("ℹ️ 源文件夹中没有找到图片文件可供处理。")
print("✨ 图片水印处理完成!")
return
for filename in files_to_process:
source_path = os.path.join(source_dir, filename)
output_path = os.path.join(output_dir, filename)
img = None
try:
img = Image.open(source_path).convert("RGBA") # 确保是RGBA模式
print(f"--- 正在处理图片:'{filename}' ---")
for watermark_rule in config.get("watermark_rules", []):
img = _apply_single_watermark(img, watermark_rule, source_path) # 应用水印规则
# 保存最终图片
# 处理重名
base_dest_name, dest_ext = os.path.splitext(output_path)
counter = 1
while os.path.exists(output_path):
output_path = f"{base_dest_name}({counter}){dest_ext}"
counter += 1
# 根据原始文件的格式或需求来保存
if filename.lower().endswith('.png'):
img.save(output_path, quality=90) # PNG保存时也可指定质量
elif filename.lower().endswith(('.jpg', '.jpeg')):
img.save(output_path, quality=90) # JPG保存时指定质量
else: # 其他格式默认保存为PNG
img.save(os.path.splitext(output_path)[0] + ".png")
print(f" ✅ 图片水印处理完成,保存到:'{os.path.basename(output_path)}'")
except Exception as e:
print(f"❌ 处理文件 '{filename}' 失败:{e}")
finally:
if img: img.close() # 确保关闭图片文件
print("\n✨ 所有图片水印处理完成!")
if __name__ == "__main__":
# 确保 config_watermark.yaml 在同一目录下
config_file_path = os.path.join(os.path.dirname(__file__), "config_watermark.yaml")