批量设置域名 dkim 脚本 - iRedMail - dkim_setup.py - ptyhon - debian

环境:debian 12.10 ,iRedMail - 1.7.3
目标:批量配置域名 dkim

步骤:

  1. 将 dkim_setup.py 及 domains.txt 上传到邮局服务器文件系统里,如 /root
  2. python3 dkim_setup.py # 执行 dkim_setup.py 脚本。脚本会从 domains.txt (可自行创建,每行一个域名) 里逐行读取邮件域名,创建 dkim 文件、将 相应的域名和 key 追加到 /etc/amavis/conf.d/50-user 文件内容中。脚本每次执行前,会对当前的 /etc/amavis/conf.d/50-user 复制备份,预防出错后需要还原使用,最后脚本会将域名及对应的解析值追加到 dkim_dns_records.json ,方便您解析域名
  3. amavisd testkey # 如需验证 dkim 的查询结果,可执行此命令

如需手工操作:

  1. amavisd-new genrsa /var/lib/dkim/iredmail.demo.anqun.org.pem 2048 # 为邮件域名 iredmail.demo.anqun.org 生成数字签名文件
  2. chown amavis:amavis amavisd genrsa /var/lib/dkim/iredmail.demo.anqun.org.pem # 更改签名文件属主
  3. chmod 0400 /var/lib/dkim/iredmail.demo.anqun.org.pem # 更改签名权限
  4. 编辑 /etc/amavis/conf.d/50-user 文件内容,将签名的邮件域名和文件等追加进去

    # Add dkim_key here.
    dkim_key('iredmail.demo.anqun.org', 'dkim', '/var/lib/dkim/iredmail.demo.anqun.org.pem');
    
    @dkim_signature_options_bysender_maps = ({
     # 'd' defaults to a domain of an author/sender address,
     # 's' defaults to whatever selector is offered by a matching key
    
     # Per-domain dkim key
     #"domain.com"  => { d => "domain.com", a => 'rsa-sha256', ttl => 10*24*3600 },
    
     # catch-all (one dkim key for all domains)
     '.' => {d => 'iredmail.demo.anqun.org',
             a => 'rsa-sha256',
             c => 'relaxed/simple',
             ttl => 30*24*3600 },
    });

dkim_setup.py 的文件内容:

#!/usr/bin/env python3

import subprocess
import os
import datetime
import json

AMAVIS_CONFIG_FILE = "/etc/amavis/conf.d/50-user"
DKIM_BASE_PATH = "/var/lib/dkim"
DKIM_KEY_SIZE = 2048  # Or 2048, if desired
AMAVIS_COMMAND = "amavisd"  # Changed from amavisd-new
DNS_OUTPUT_FILE = "dkim_dns_records.json"

def generate_dkim_key(domain):
    """Generates a DKIM key for the given domain."""
    key_path = os.path.join(DKIM_BASE_PATH, f"{domain}.pem")
    command = [AMAVIS_COMMAND, "genrsa", key_path, str(DKIM_KEY_SIZE)]
    try:
        subprocess.run(command, check=True)
        subprocess.run(["chown", "amavis:amavis", key_path], check=True)
        subprocess.run(["chmod", "0400", key_path], check=True)
        print(f"DKIM key generated for {domain} at {key_path}")
        return key_path
    except subprocess.CalledProcessError as e:
        print(f"Error generating DKIM key for {domain}: {e}")
        return None

def get_dkim_public_key(key_path):
    """Extracts the DKIM public key from the .pem file using openssl."""
    try:
        command = ["openssl", "rsa", "-in", key_path, "-pubout", "-outform", "PEM"]
        process = subprocess.run(command, capture_output=True, text=True, check=True)
        public_key = process.stdout.strip()

        # Remove the BEGIN and END PUBLIC KEY lines and any newlines
        public_key = public_key.replace("-----BEGIN PUBLIC KEY-----", "")
        public_key = public_key.replace("-----END PUBLIC KEY-----", "")
        public_key = public_key.replace("\n", "")

        return public_key
    except subprocess.CalledProcessError as e:
        print(f"Error extracting DKIM public key using openssl: {e}")
        print(f"Stderr: {e.stderr}")  # Print stderr for more details
        return None
    except Exception as e:
        print(f"Error extracting DKIM public key: {e}")
        return None


def domain_exists(domain):
    """Checks if the domain is already configured in Amavis."""
    try:
        with open(AMAVIS_CONFIG_FILE, "r") as f:
            config_content = f.read()
        return domain in config_content
    except FileNotFoundError:
        print(f"Error: Amavis config file not found at {AMAVIS_CONFIG_FILE}")
        return False

def backup_amavis_config():
    """Backs up the Amavis configuration file with a timestamp."""
    timestamp = datetime.datetime.now().strftime("%Y.%m.%d.%H.%M.%S")
    backup_file = f"{AMAVIS_CONFIG_FILE}.{timestamp}"
    try:
        subprocess.run(["cp", AMAVIS_CONFIG_FILE, backup_file], check=True)
        print(f"Amavis config backed up to {backup_file}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error backing up Amavis config: {e}")
        return False

def update_amavis_config(domain, key_path):
    """Updates the Amavis configuration file with the DKIM settings."""
    try:
        with open(AMAVIS_CONFIG_FILE, "r") as f:
            config_lines = f.readlines()

        dkim_key_insert_point = None
        start_index = None
        end_index = None
        last_entry_index = None

        for i, line in enumerate(config_lines):
            if "dkim_key(" in line:
                dkim_key_insert_point = i + 1

            if "@dkim_signature_options_bysender_maps = (" in line:
                start_index = i

            if start_index is not None and "=>" in line:
                # 检查是否是一个 domain entry 的开始
                if line.lstrip().startswith('"') or line.lstrip().startswith("'"):
                    last_entry_index = i

            if "});" in line and start_index is not None:
                end_index = i
                break

        if dkim_key_insert_point is None:
            print("Could not find insertion point for dkim_key.")
            return False

        if start_index is None or end_index is None:
            print("Could not find signature options structure.")
            return False

        # 如果没有找到任何已有的 domain entry,则默认插入在 start_index + 1
        insert_sig_index = last_entry_index + 1 if last_entry_index is not None else start_index + 1

        # 构造新行
        new_dkim_key_line = f"dkim_key('{domain}', 'dkim', '{key_path}');\n"
        new_dkim_sig_line = f'    "{domain}" => {{ d => "{domain}", a => \'rsa-sha256\', ttl => 10*24*3600 }},\n'

        # 插入 dkim_key
        config_lines.insert(dkim_key_insert_point, new_dkim_key_line)

        # 如果不是第一个条目,检查前一行是否有逗号
        if last_entry_index is not None:
            prev_line = config_lines[insert_sig_index - 1]
            if not prev_line.strip().endswith(','):
                config_lines[insert_sig_index - 1] = prev_line.rstrip('\n') + ',\n'

        # 插入新的签名选项
        config_lines.insert(insert_sig_index, new_dkim_sig_line)

        # 写回文件
        with open(AMAVIS_CONFIG_FILE, "w") as f:
            f.writelines(config_lines)

        print(f"Amavis config updated for {domain}")
        return True

    except FileNotFoundError:
        print(f"Error: Amavis config file not found at {AMAVIS_CONFIG_FILE}")
        return False
    except Exception as e:
        print(f"Error updating Amavis config: {e}")
        return False

def restart_amavis():
    """Restarts the Amavis service."""
    try:
        subprocess.run(["systemctl", "restart", "amavis"], check=True)  # Corrected command
        print("Amavis service restarted.")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error restarting Amavis: {e}")
        return False

def write_dns_record(domain, public_key):
    """Writes the DNS record to a JSON file."""
    dns_record_name = f"dkim._domainkey.{domain}"
    # Construct the TXT record value
    txt_record_value = f"v=DKIM1; p={public_key}"
    data = {dns_record_name: txt_record_value}

    try:
        # Check if the file exists and load existing data
        if os.path.exists(DNS_OUTPUT_FILE):
            with open(DNS_OUTPUT_FILE, "r") as f:
                try:
                    existing_data = json.load(f)
                except json.JSONDecodeError:
                    existing_data = {}  # Handle empty or corrupted JSON file
        else:
            existing_data = {}

        # Update with the new record
        existing_data.update(data)

        # Write back to the file
        with open(DNS_OUTPUT_FILE, "w") as f:
            json.dump(existing_data, f, indent=4, ensure_ascii=False)  # indent for readability, disable ASCII escaping

        print(f"DNS record written to {DNS_OUTPUT_FILE}")

    except Exception as e:
        print(f"Error writing DNS record to file: {e}")

if __name__ == "__main__":
    import sys

    # Backup Amavis config *before* processing any domains
    if not backup_amavis_config():
        print("Failed to backup Amavis config.  Aborting.")
        sys.exit(1)

    try:
        with open("domains.txt", "r") as f:
            domains = [line.strip() for line in f.readlines()]
    except FileNotFoundError:
        print("Error: domains.txt not found")
        sys.exit(1)


    for domain in domains:
        print(f"Processing domain: {domain}")

        if domain_exists(domain):
            print(f"Domain {domain} already exists in Amavis config. Skipping.")
            continue



        key_path = generate_dkim_key(domain)
        if key_path:
            if update_amavis_config(domain, key_path):
                public_key = get_dkim_public_key(key_path)
                if public_key:
                    write_dns_record(domain, public_key)
                else:
                    print(f"Failed to get public key for {domain}")
            else:
                print(f"Failed to update amavis config for {domain}")

    if domains: #Only restart if there was at least one domain to process
        restart_amavis()

演示视频:https://www.bilibili.com/video/BV1Jj7fztEN3/

参考:

标签: iredmail, python, dkim

添加新评论