批量设置域名 dkim 脚本 - iRedMail - dkim_setup.py - ptyhon - debian
环境:debian 12.10 ,iRedMail - 1.7.3
目标:批量配置域名 dkim
步骤:
- 将 dkim_setup.py 及 domains.txt 上传到邮局服务器文件系统里,如 /root
- python3 dkim_setup.py # 执行 dkim_setup.py 脚本。脚本会从 domains.txt (可自行创建,每行一个域名) 里逐行读取邮件域名,创建 dkim 文件、将 相应的域名和 key 追加到 /etc/amavis/conf.d/50-user 文件内容中。脚本每次执行前,会对当前的 /etc/amavis/conf.d/50-user 复制备份,预防出错后需要还原使用,最后脚本会将域名及对应的解析值追加到 dkim_dns_records.json ,方便您解析域名
- amavisd testkey # 如需验证 dkim 的查询结果,可执行此命令
如需手工操作:
- amavisd-new genrsa /var/lib/dkim/iredmail.demo.anqun.org.pem 2048 # 为邮件域名 iredmail.demo.anqun.org 生成数字签名文件
- chown amavis:amavis amavisd genrsa /var/lib/dkim/iredmail.demo.anqun.org.pem # 更改签名文件属主
- chmod 0400 /var/lib/dkim/iredmail.demo.anqun.org.pem # 更改签名权限
编辑 /etc/amavis/conf.d/50-user 文件内容,将签名的邮件域名和文件等追加进去
# Add dkim_key here. dkim_key('iredmail.demo.anqun.org', 'dkim', '/var/lib/dkim/iredmail.demo.anqun.org.pem'); @dkim_signature_options_bysender_maps = ({ # 'd' defaults to a domain of an author/sender address, # 's' defaults to whatever selector is offered by a matching key # Per-domain dkim key #"domain.com" => { d => "domain.com", a => 'rsa-sha256', ttl => 10*24*3600 }, # catch-all (one dkim key for all domains) '.' => {d => 'iredmail.demo.anqun.org', a => 'rsa-sha256', c => 'relaxed/simple', ttl => 30*24*3600 }, });
dkim_setup.py 的文件内容:
#!/usr/bin/env python3
import subprocess
import os
import datetime
import json
AMAVIS_CONFIG_FILE = "/etc/amavis/conf.d/50-user"
DKIM_BASE_PATH = "/var/lib/dkim"
DKIM_KEY_SIZE = 2048 # Or 2048, if desired
AMAVIS_COMMAND = "amavisd" # Changed from amavisd-new
DNS_OUTPUT_FILE = "dkim_dns_records.json"
def generate_dkim_key(domain):
"""Generates a DKIM key for the given domain."""
key_path = os.path.join(DKIM_BASE_PATH, f"{domain}.pem")
command = [AMAVIS_COMMAND, "genrsa", key_path, str(DKIM_KEY_SIZE)]
try:
subprocess.run(command, check=True)
subprocess.run(["chown", "amavis:amavis", key_path], check=True)
subprocess.run(["chmod", "0400", key_path], check=True)
print(f"DKIM key generated for {domain} at {key_path}")
return key_path
except subprocess.CalledProcessError as e:
print(f"Error generating DKIM key for {domain}: {e}")
return None
def get_dkim_public_key(key_path):
"""Extracts the DKIM public key from the .pem file using openssl."""
try:
command = ["openssl", "rsa", "-in", key_path, "-pubout", "-outform", "PEM"]
process = subprocess.run(command, capture_output=True, text=True, check=True)
public_key = process.stdout.strip()
# Remove the BEGIN and END PUBLIC KEY lines and any newlines
public_key = public_key.replace("-----BEGIN PUBLIC KEY-----", "")
public_key = public_key.replace("-----END PUBLIC KEY-----", "")
public_key = public_key.replace("\n", "")
return public_key
except subprocess.CalledProcessError as e:
print(f"Error extracting DKIM public key using openssl: {e}")
print(f"Stderr: {e.stderr}") # Print stderr for more details
return None
except Exception as e:
print(f"Error extracting DKIM public key: {e}")
return None
def domain_exists(domain):
"""Checks if the domain is already configured in Amavis."""
try:
with open(AMAVIS_CONFIG_FILE, "r") as f:
config_content = f.read()
return domain in config_content
except FileNotFoundError:
print(f"Error: Amavis config file not found at {AMAVIS_CONFIG_FILE}")
return False
def backup_amavis_config():
"""Backs up the Amavis configuration file with a timestamp."""
timestamp = datetime.datetime.now().strftime("%Y.%m.%d.%H.%M.%S")
backup_file = f"{AMAVIS_CONFIG_FILE}.{timestamp}"
try:
subprocess.run(["cp", AMAVIS_CONFIG_FILE, backup_file], check=True)
print(f"Amavis config backed up to {backup_file}")
return True
except subprocess.CalledProcessError as e:
print(f"Error backing up Amavis config: {e}")
return False
def update_amavis_config(domain, key_path):
"""Updates the Amavis configuration file with the DKIM settings."""
try:
with open(AMAVIS_CONFIG_FILE, "r") as f:
config_lines = f.readlines()
dkim_key_insert_point = None
start_index = None
end_index = None
last_entry_index = None
for i, line in enumerate(config_lines):
if "dkim_key(" in line:
dkim_key_insert_point = i + 1
if "@dkim_signature_options_bysender_maps = (" in line:
start_index = i
if start_index is not None and "=>" in line:
# 检查是否是一个 domain entry 的开始
if line.lstrip().startswith('"') or line.lstrip().startswith("'"):
last_entry_index = i
if "});" in line and start_index is not None:
end_index = i
break
if dkim_key_insert_point is None:
print("Could not find insertion point for dkim_key.")
return False
if start_index is None or end_index is None:
print("Could not find signature options structure.")
return False
# 如果没有找到任何已有的 domain entry,则默认插入在 start_index + 1
insert_sig_index = last_entry_index + 1 if last_entry_index is not None else start_index + 1
# 构造新行
new_dkim_key_line = f"dkim_key('{domain}', 'dkim', '{key_path}');\n"
new_dkim_sig_line = f' "{domain}" => {{ d => "{domain}", a => \'rsa-sha256\', ttl => 10*24*3600 }},\n'
# 插入 dkim_key
config_lines.insert(dkim_key_insert_point, new_dkim_key_line)
# 如果不是第一个条目,检查前一行是否有逗号
if last_entry_index is not None:
prev_line = config_lines[insert_sig_index - 1]
if not prev_line.strip().endswith(','):
config_lines[insert_sig_index - 1] = prev_line.rstrip('\n') + ',\n'
# 插入新的签名选项
config_lines.insert(insert_sig_index, new_dkim_sig_line)
# 写回文件
with open(AMAVIS_CONFIG_FILE, "w") as f:
f.writelines(config_lines)
print(f"Amavis config updated for {domain}")
return True
except FileNotFoundError:
print(f"Error: Amavis config file not found at {AMAVIS_CONFIG_FILE}")
return False
except Exception as e:
print(f"Error updating Amavis config: {e}")
return False
def restart_amavis():
"""Restarts the Amavis service."""
try:
subprocess.run(["systemctl", "restart", "amavis"], check=True) # Corrected command
print("Amavis service restarted.")
return True
except subprocess.CalledProcessError as e:
print(f"Error restarting Amavis: {e}")
return False
def write_dns_record(domain, public_key):
"""Writes the DNS record to a JSON file."""
dns_record_name = f"dkim._domainkey.{domain}"
# Construct the TXT record value
txt_record_value = f"v=DKIM1; p={public_key}"
data = {dns_record_name: txt_record_value}
try:
# Check if the file exists and load existing data
if os.path.exists(DNS_OUTPUT_FILE):
with open(DNS_OUTPUT_FILE, "r") as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
existing_data = {} # Handle empty or corrupted JSON file
else:
existing_data = {}
# Update with the new record
existing_data.update(data)
# Write back to the file
with open(DNS_OUTPUT_FILE, "w") as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False) # indent for readability, disable ASCII escaping
print(f"DNS record written to {DNS_OUTPUT_FILE}")
except Exception as e:
print(f"Error writing DNS record to file: {e}")
if __name__ == "__main__":
import sys
# Backup Amavis config *before* processing any domains
if not backup_amavis_config():
print("Failed to backup Amavis config. Aborting.")
sys.exit(1)
try:
with open("domains.txt", "r") as f:
domains = [line.strip() for line in f.readlines()]
except FileNotFoundError:
print("Error: domains.txt not found")
sys.exit(1)
for domain in domains:
print(f"Processing domain: {domain}")
if domain_exists(domain):
print(f"Domain {domain} already exists in Amavis config. Skipping.")
continue
key_path = generate_dkim_key(domain)
if key_path:
if update_amavis_config(domain, key_path):
public_key = get_dkim_public_key(key_path)
if public_key:
write_dns_record(domain, public_key)
else:
print(f"Failed to get public key for {domain}")
else:
print(f"Failed to update amavis config for {domain}")
if domains: #Only restart if there was at least one domain to process
restart_amavis()
演示视频:https://www.bilibili.com/video/BV1Jj7fztEN3/
参考: