Python申请Let's Encrypt SSL证书完整指南:虚拟主机用户的可行方案

核心答案:虚拟主机用户可通过Python脚本调用ACME协议与DNS服务商API,在本地完成DNS-01验证并获取SSL证书,再手动上传至虚拟主机。该方案无需服务器SSH权限,支持多域名与通配符证书,且完全免费。

python申请Let's Encrypt SSL证书原文件目录展示

python申请Let's Encrypt R12证书


一、方案概述与适用场景

1.1 为什么选择DNS-01验证

虚拟主机通常不提供SSH访问和自动化部署能力,传统的HTTP-01验证(需在服务器放置验证文件)难以实施。DNS-01验证通过向DNS添加TXT记录完成域名所有权证明,更适合虚拟主机场景。

验证方式适用场景虚拟主机友好度通配符支持技术复杂度
HTTP-01自有服务器低(需文件上传权限)不支持
DNS-01任意托管环境高(仅需DNS管理权限)支持
TLS-ALPN-01自有服务器不支持

DNS-01的核心优势在于验证过程完全通过DNS完成,与服务器托管方式解耦。您只需拥有DNS管理权限,即可在本地计算机运行脚本完成证书申请。

1.2 工具工作原理

本工具(EasyCert 2.0)采用分层架构:

  1. CLI层:解析命令行参数与JSON配置文件

  2. DNS Provider层:对接各平台API,自动添加/删除TXT记录

  3. ACME执行层:与Let's Encrypt交互,完成挑战验证与证书签发

  4. 证书输出层:生成标准格式文件(privkey.key、fullchain.pem等)

标准操作流程

本地运行脚本 → 连接DNS服务商API → 添加TXT记录 → 等待DNS传播 
→ 通知Let's Encrypt验证 → 签发证书 → 删除TXT记录 → 输出证书文件

二、Python原代码、环境准备与安装

2.0 Python原代码

import argparse
import base64
import hashlib
import hmac
import json
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Mapping, cast

import requests
from acme import client, messages
from acme import challenges as acme_challenges
from acme import errors as acme_errors
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.utils import CryptographyDeprecationWarning
from cryptography.x509.oid import NameOID
from dns import resolver
from josepy.jwk import JWKEC, JWKRSA

LE_PRODUCTION_DIRECTORY = "https://acme-v02.api.letsencrypt.org/directory"
LE_STAGING_DIRECTORY = "https://acme-staging-v02.api.letsencrypt.org/directory"
STATE_FILE = ".easycert_state.json"
JSONDict = dict[str, Any]
PrivateKey = rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey

# 定向忽略 cryptography 关于 naïve datetime 的弃用提示,避免影响 CLI 输出。
warnings.filterwarnings(
    "ignore",
    category=CryptographyDeprecationWarning,
    message=r"Properties that return a naïve datetime object have been deprecated.*",
)


def _now_utc() -> datetime:
    return datetime.now(timezone.utc)


def _split_domain_labels(name: str) -> list[str]:
    return [x for x in name.strip(".").split(".") if x]


def _normalize_domains(domains: list[str]) -> list[str]:
    norm: list[str] = []
    for d in domains:
        d = d.strip().lower().rstrip(".")
        if not d:
            continue
        norm.append(d)
    unique = sorted(set(norm), key=norm.index)
    if not unique:
        raise ValueError("域名列表不能为空。")
    if len(unique) > 100:
        raise ValueError("单张证书最多支持 100 个域名(Let's Encrypt 限制)。")
    return unique


def _base_domain_for_rate_limit(domain: str) -> str:
    d = domain[2:] if domain.startswith("*.") else domain
    labels = _split_domain_labels(d)
    if len(labels) < 2:
        return d
    return ".".join(labels[-2:])


def _load_json(path: Path) -> JSONDict:
    if not path.exists():
        return {}
    with path.open("r", encoding="utf-8") as f:
        return cast(JSONDict, json.load(f))


def _save_json(path: Path, data: Mapping[str, Any]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


class DNSProvider(ABC):
    @abstractmethod
    def create_txt_record(self, fqdn: str, value: str) -> str:
        raise NotImplementedError

    @abstractmethod
    def delete_txt_record(self, record_ref: str) -> None:
        raise NotImplementedError


class CloudflareDNSProvider(DNSProvider):
    def __init__(self, api_token: str):
        self.api_token = api_token
        self.base = "https://api.cloudflare.com/client/v4"
        self.headers = {
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
        }

    def _request(self, method: str, path: str, **kwargs: Any) -> JSONDict:
        url = f"{self.base}{path}"
        resp = requests.request(method, url, headers=self.headers, timeout=30, **kwargs)
        data = cast(JSONDict, resp.json())
        if resp.status_code >= 400 or not data.get("success", False):
            raise RuntimeError(f"Cloudflare API 失败: {data}")
        return data

    def _find_zone(self, fqdn: str) -> tuple[str, str]:
        labels = _split_domain_labels(fqdn)
        for i in range(len(labels) - 1):
            zone_name = ".".join(labels[i:])
            data = self._request("GET", f"/zones?name={zone_name}&status=active")
            result_raw = cast(list[JSONDict], data.get("result", []))
            for item in result_raw:
                zone_id = cast(str | None, item.get("id"))
                if isinstance(zone_id, str):
                    return zone_id, zone_name
        raise RuntimeError(f"Cloudflare 未找到可用 Zone: {fqdn}")

    def create_txt_record(self, fqdn: str, value: str) -> str:
        zone_id, _ = self._find_zone(fqdn)
        payload: dict[str, str | int] = {
            "type": "TXT",
            "name": fqdn,
            "content": value,
            "ttl": 120,
        }
        data = self._request("POST", f"/zones/{zone_id}/dns_records", json=payload)
        rid = data["result"]["id"]
        return f"{zone_id}:{rid}"

    def delete_txt_record(self, record_ref: str) -> None:
        zone_id, rid = record_ref.split(":", 1)
        self._request("DELETE", f"/zones/{zone_id}/dns_records/{rid}")


class AliyunDNSProvider(DNSProvider):
    def __init__(self, access_key_id: str, access_key_secret: str):
        self.ak = access_key_id
        self.sk = access_key_secret
        self.endpoint = "https://alidns.aliyuncs.com/"

    @staticmethod
    def _percent_encode(value: str) -> str:
        from urllib.parse import quote

        return quote(value, safe="~")

    def _sign(self, params: dict[str, str]) -> str:
        sorted_items = sorted(params.items(), key=lambda x: x[0])
        canonical = "&".join(
            [f"{self._percent_encode(k)}={self._percent_encode(v)}" for k, v in sorted_items]
        )
        string_to_sign = f"GET&%2F&{self._percent_encode(canonical)}"
        digest = hmac.new(f"{self.sk}&".encode(), string_to_sign.encode(), hashlib.sha1).digest()
        return base64.b64encode(digest).decode()

    def _request(self, action: str, extra: Mapping[str, str]) -> JSONDict:
        params: dict[str, str] = {
            "Action": action,
            "Format": "JSON",
            "Version": "2015-01-09",
            "AccessKeyId": self.ak,
            "SignatureMethod": "HMAC-SHA1",
            "Timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
            "SignatureVersion": "1.0",
            "SignatureNonce": str(uuid.uuid4()),
        }
        params.update(extra)
        params["Signature"] = self._sign(params)
        resp = requests.get(self.endpoint, params=params, timeout=30)
        data = cast(JSONDict, resp.json())
        if data.get("Code") == "Forbidden.RAM":
            detail = data.get("AccessDeniedDetail")
            raise RuntimeError(
                "Aliyun RAM 权限不足。请给当前 RAM 子账号授权 alidns:AddDomainRecord、"
                "alidns:DeleteDomainRecord、alidns:DescribeDomainRecords(至少对目标域名)。"
                f" 详细信息: {detail}"
            )
        if resp.status_code >= 400 or data.get("Code"):
            raise RuntimeError(f"Aliyun DNS API 失败: {data}")
        return data

    def _split_rr_and_domain(self, fqdn: str) -> tuple[str, str]:
        labels = _split_domain_labels(fqdn)
        if len(labels) < 3:
            raise ValueError("Aliyun 记录名解析失败,请确认域名格式。")
        domain = ".".join(labels[-2:])
        rr = ".".join(labels[:-2])
        return rr, domain

    def create_txt_record(self, fqdn: str, value: str) -> str:
        rr, domain = self._split_rr_and_domain(fqdn)
        data = self._request(
            "AddDomainRecord",
            {
                "DomainName": domain,
                "RR": rr,
                "Type": "TXT",
                "Value": value,
                "TTL": "600",
            },
        )
        return str(data["RecordId"])

    def delete_txt_record(self, record_ref: str) -> None:
        self._request("DeleteDomainRecord", {"RecordId": record_ref})


class DNSPodDNSProvider(DNSProvider):
    def __init__(self, secret_id: str, secret_key: str, region: str = "ap-guangzhou"):
        self.secret_id = secret_id
        self.secret_key = secret_key
        self.region = region
        self.endpoint = "dnspod.tencentcloudapi.com"
        self.service = "dnspod"
        self.version = "2021-03-23"

    def _sign_tc3(self, action: str, payload: Mapping[str, Any]) -> dict[str, str]:
        timestamp = int(time.time())
        date = datetime.fromtimestamp(timestamp, timezone.utc).strftime("%Y-%m-%d")
        canonical_uri = "/"
        canonical_querystring = ""
        canonical_headers = f"content-type:application/json; charset=utf-8\nhost:{self.endpoint}\n"
        signed_headers = "content-type;host"
        body = json.dumps(payload, ensure_ascii=False)
        hashed_request_payload = hashlib.sha256(body.encode("utf-8")).hexdigest()
        canonical_request = "\n".join(
            [
                "POST",
                canonical_uri,
                canonical_querystring,
                canonical_headers,
                signed_headers,
                hashed_request_payload,
            ]
        )
        algorithm = "TC3-HMAC-SHA256"
        credential_scope = f"{date}/{self.service}/tc3_request"
        string_to_sign = "\n".join(
            [
                algorithm,
                str(timestamp),
                credential_scope,
                hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
            ]
        )

        def _hmac_sha256(key: bytes, msg: str) -> bytes:
            return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

        secret_date = _hmac_sha256(("TC3" + self.secret_key).encode("utf-8"), date)
        secret_service = _hmac_sha256(secret_date, self.service)
        secret_signing = _hmac_sha256(secret_service, "tc3_request")
        signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()

        authorization = (
            f"{algorithm} Credential={self.secret_id}/{credential_scope}, "
            f"SignedHeaders={signed_headers}, Signature={signature}"
        )
        return {
            "Authorization": authorization,
            "Content-Type": "application/json; charset=utf-8",
            "Host": self.endpoint,
            "X-TC-Action": action,
            "X-TC-Timestamp": str(timestamp),
            "X-TC-Version": self.version,
            "X-TC-Region": self.region,
        }

    def _request(self, action: str, payload: Mapping[str, Any]) -> JSONDict:
        headers = self._sign_tc3(action, payload)
        url = f"https://{self.endpoint}"
        resp = requests.post(url, headers=headers, data=json.dumps(payload, ensure_ascii=False), timeout=30)
        data = cast(JSONDict, resp.json())
        response = cast(JSONDict, data.get("Response", {}))
        err = response.get("Error")
        if resp.status_code >= 400 or err:
            raise RuntimeError(f"DNSPod API 失败: {data}")
        return response

    @staticmethod
    def _split_rr_and_domain(fqdn: str) -> tuple[str, str]:
        labels = _split_domain_labels(fqdn)
        if len(labels) < 3:
            raise ValueError("DNSPod 记录名解析失败,请确认域名格式。")
        domain = ".".join(labels[-2:])
        sub_domain = ".".join(labels[:-2])
        return sub_domain, domain

    def create_txt_record(self, fqdn: str, value: str) -> str:
        sub_domain, domain = self._split_rr_and_domain(fqdn)
        payload: dict[str, str | int] = {
            "Domain": domain,
            "SubDomain": sub_domain,
            "RecordType": "TXT",
            "RecordLine": "默认",
            "Value": value,
            "TTL": 120,
        }
        resp = self._request("CreateRecord", payload)
        return f"{domain}:{resp['RecordId']}"

    def delete_txt_record(self, record_ref: str) -> None:
        domain, rid = record_ref.split(":", 1)
        payload: dict[str, str | int] = {"Domain": domain, "RecordId": int(rid)}
        self._request("DeleteRecord", payload)


@dataclass
class CertResult:
    cert_pem: str
    chain_pem: str
    fullchain_pem: str
    private_key_pem: bytes
    primary_domain: str


def _build_dns_provider(args: argparse.Namespace) -> DNSProvider:
    if args.provider == "cloudflare":
        token = args.cloudflare_token
        if not token:
            raise ValueError("Cloudflare 需要 --cloudflare-token")
        return CloudflareDNSProvider(token)
    if args.provider == "aliyun":
        if not args.aliyun_access_key_id or not args.aliyun_access_key_secret:
            raise ValueError("Aliyun 需要 --aliyun-access-key-id 和 --aliyun-access-key-secret")
        return AliyunDNSProvider(args.aliyun_access_key_id, args.aliyun_access_key_secret)
    if args.provider == "dnspod":
        if not args.dnspod_secret_id or not args.dnspod_secret_key:
            raise ValueError("DNSPod 需要 --dnspod-secret-id 和 --dnspod-secret-key")
        return DNSPodDNSProvider(args.dnspod_secret_id, args.dnspod_secret_key)
    raise ValueError(f"不支持的 DNS Provider: {args.provider}")


def _generate_private_key(args: argparse.Namespace) -> tuple[PrivateKey, bytes]:
    if args.key_type == "rsa":
        key = rsa.generate_private_key(public_exponent=65537, key_size=args.rsa_bits)
    else:
        curve = ec.SECP256R1() if args.ec_curve == "secp256r1" else ec.SECP384R1()
        key = ec.generate_private_key(curve)
    key_pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )
    return key, key_pem


def _generate_csr(private_key: PrivateKey, domains: list[str]) -> bytes:
    subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domains[0])])
    san = x509.SubjectAlternativeName([x509.DNSName(d) for d in domains])
    csr = (
        x509.CertificateSigningRequestBuilder()
        .subject_name(subject)
        .add_extension(san, critical=False)
        .sign(private_key, hashes.SHA256())
    )
    return csr.public_bytes(serialization.Encoding.PEM)


def _make_acme_client(account_key_path: Path, email: str, directory_url: str) -> tuple[client.ClientV2, JWKRSA | JWKEC]:
    if account_key_path.exists():
        account_key = cast(
            PrivateKey,
            serialization.load_pem_private_key(
            account_key_path.read_bytes(),
            password=None,
            ),
        )
    else:
        account_key = rsa.generate_private_key(public_exponent=65537, key_size=4096)
        account_key_path.parent.mkdir(parents=True, exist_ok=True)
        account_key_path.write_bytes(
            account_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption(),
            )
        )

    if isinstance(account_key, rsa.RSAPrivateKey):
        jwk = JWKRSA(key=account_key)
    else:
        jwk = JWKEC(key=account_key)

    net = client.ClientNetwork(jwk, user_agent="easycert/2.0")
    directory = messages.Directory.from_json(net.get(directory_url).json())
    acme_client = client.ClientV2(directory, net)
    try:
        regr = acme_client.new_account(
            messages.NewRegistration.from_data(email=email, terms_of_service_agreed=True)
        )
        # 某些版本下兜底确保已绑定 account(kid)
        if getattr(acme_client.net, "account", None) is None:
            acme_client.net.account = regr
    except acme_errors.ConflictError as ex:
        # 账号已存在:按 acme-python 约定通过 query_registration 绑定已有 account(kid)
        existing_regr = messages.RegistrationResource(
            body=messages.Registration.from_data(email=email),
            uri=ex.location,
        )
        acme_client.query_registration(existing_regr)
    except Exception as ex:
        raise RuntimeError(
            f"ACME 账号初始化失败,请检查邮箱与网络连通性: {ex.__class__.__name__}: {ex}"
        ) from ex
    return acme_client, jwk


def _resolve_txt_has_value(fqdn: str, expected_value: str) -> bool:
    try:
        answers = resolver.resolve(fqdn, "TXT", lifetime=8)
        txt_values = [b"".join(r.strings).decode() for r in answers]
        return expected_value in txt_values
    except Exception:
        return False


def issue_certificate(args: argparse.Namespace) -> CertResult:
    provider = _build_dns_provider(args)
    domains = _normalize_domains(args.domains)

    key_obj, key_pem = _generate_private_key(args)
    csr_pem = _generate_csr(key_obj, domains)

    account_key_path = Path(args.account_key).resolve()
    directory = args.server_url or (LE_STAGING_DIRECTORY if args.staging else LE_PRODUCTION_DIRECTORY)
    acme_client, jwk = _make_acme_client(account_key_path, args.email, directory)

    print("[INFO] 开始创建订单...")
    order = acme_client.new_order(csr_pem)
    record_refs: list[str] = []

    try:
        authorizations = cast(list[Any], getattr(order, "authorizations", []))
        for authz in authorizations:
            identifier = authz.body.identifier.value
            challenges = cast(list[Any], authz.body.challenges)
            challenge = next(
                (c for c in challenges if isinstance(c.chall, acme_challenges.DNS01)),
                None,
            )
            if not challenge:
                raise RuntimeError(f"未找到 DNS-01 挑战: {identifier}")

            challenge_domain = identifier[2:] if identifier.startswith("*.") else identifier
            fqdn = f"_acme-challenge.{challenge_domain}"
            value = challenge.chall.validation(jwk)

            print(f"[INFO] 写入 DNS TXT: {fqdn}")
            ref = provider.create_txt_record(fqdn, value)
            record_refs.append(ref)

            print(f"[INFO] 等待 DNS 传播 {args.dns_propagation_seconds} 秒...")
            time.sleep(args.dns_propagation_seconds)
            if _resolve_txt_has_value(fqdn, value):
                print(f"[OK] DNS TXT 已可解析: {fqdn}")
            else:
                print(f"[WARN] 本地 DNS 尚未解析到 TXT,继续请求 ACME 校验: {fqdn}")

            response = challenge.chall.response(jwk)
            acme_client.answer_challenge(challenge, response)

        print("[INFO] 等待 ACME 验证并签发证书...")
        finalized_order = acme_client.poll_and_finalize(order)
        fullchain_pem = finalized_order.fullchain_pem
        cert_pem = finalized_order.fullchain_pem.split("-----END CERTIFICATE-----")[0] + "-----END CERTIFICATE-----\n"
        chain_pem = fullchain_pem.replace(cert_pem, "", 1)

        return CertResult(
            cert_pem=cert_pem,
            chain_pem=chain_pem,
            fullchain_pem=fullchain_pem,
            private_key_pem=key_pem,
            primary_domain=domains[0],
        )
    finally:
        for rr in record_refs:
            try:
                provider.delete_txt_record(rr)
            except Exception as ex:
                print(f"[WARN] 清理 DNS TXT 失败({rr}): {ex}")


def _parse_chain_certificates(fullchain_pem: str) -> list[x509.Certificate]:
    certs: list[x509.Certificate] = []
    parts = fullchain_pem.split("-----END CERTIFICATE-----")
    for p in parts:
        p = p.strip()
        if not p:
            continue
        pem = p + "\n-----END CERTIFICATE-----\n"
        certs.append(x509.load_pem_x509_certificate(pem.encode()))
    return certs


def _certificate_not_valid_after_utc(cert: x509.Certificate) -> datetime:
    not_valid_after_utc = getattr(cert, "not_valid_after_utc", None)
    if isinstance(not_valid_after_utc, datetime):
        return not_valid_after_utc
    return cert.not_valid_after.replace(tzinfo=timezone.utc)


def _extract_cert_info(fullchain_pem: str) -> dict[str, str]:
    certs = _parse_chain_certificates(fullchain_pem)
    leaf = certs[0]
    san_ext = leaf.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    sans = san_ext.value.get_values_for_type(x509.DNSName)
    expire_at = _certificate_not_valid_after_utc(leaf)
    days_left = (expire_at - _now_utc()).days

    chain_brand = "未知"
    if len(certs) > 1:
        issuer_cn = certs[1].subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        if issuer_cn:
            issuer_value = issuer_cn[0].value
            if isinstance(issuer_value, str):
                chain_brand = issuer_value
            elif isinstance(issuer_value, bytes):
                chain_brand = issuer_value.decode(errors="replace")
            else:
                chain_brand = str(issuer_value)
    return {
        "domains": ", ".join(sans),
        "expire_at": expire_at.strftime("%Y-%m-%d %H:%M:%S %Z"),
        "days_left": str(days_left),
        "chain_brand": chain_brand,
    }


def _https_snippet(primary_domain: str) -> dict[str, str]:
    apache = """RewriteEngine On
RewriteCond %{HTTPS} !=on
RewriteRule ^ https://%{HTTP_HOST}%{REQUEST_URI} [L,R=301]
"""
    nginx = f"""server {{
    listen 80;
    server_name {primary_domain};
    return 301 https://$host$request_uri;
}}
"""
    return {"apache": apache, "nginx": nginx}


def _rate_limit_guard(domains: list[str], staging: bool, force: bool) -> None:
    print("[INFO] Let's Encrypt 速率限制提醒:")
    print("       - 每个注册域名:每 7 天最多 50 张证书")
    print("       - 重复证书:每 7 天最多 5 张")
    print("       - 单张证书最多 100 个域名")
    if staging:
        print("[INFO] 当前为 Staging 模式,不消耗生产额度。")
        return

    state = _load_json(Path(STATE_FILE))
    raw_history = state.get("production_attempts", [])
    history: list[JSONDict] = []
    if isinstance(raw_history, list):
        for item in cast(list[Any], raw_history):
            if isinstance(item, dict):
                history.append(cast(JSONDict, item))
    now = _now_utc()
    history = [
        item
        for item in history
        if isinstance(item.get("ts"), str)
        and (now - datetime.fromisoformat(cast(str, item["ts"]))) <= timedelta(days=7)
    ]
    base_domains = {_base_domain_for_rate_limit(d) for d in domains}
    for bd in base_domains:
        count = sum(1 for item in history if item.get("base_domain") == bd)
        if count >= 45 and not force:
            raise RuntimeError(
                f"检测到 {bd} 近 7 天申请次数较高 ({count}),建议先使用 --staging,"
                "如确认继续请追加 --force。"
            )

    for bd in base_domains:
        history.append({"ts": now.isoformat(), "base_domain": bd})
    state["production_attempts"] = history
    _save_json(Path(STATE_FILE), state)


def _write_outputs(output_dir: Path, result: CertResult) -> dict[str, str]:
    output_dir.mkdir(parents=True, exist_ok=True)
    privkey_path = output_dir / "privkey.key"
    cert_path = output_dir / "cert.pem"
    chain_path = output_dir / "chain.pem"
    fullchain_path = output_dir / "fullchain.pem"

    privkey_path.write_bytes(result.private_key_pem)
    cert_path.write_text(result.cert_pem, encoding="utf-8")
    chain_path.write_text(result.chain_pem, encoding="utf-8")
    fullchain_path.write_text(result.fullchain_pem, encoding="utf-8")

    return {
        "privkey": str(privkey_path),
        "cert": str(cert_path),
        "chain": str(chain_path),
        "fullchain": str(fullchain_path),
    }


def _load_config_args(args: argparse.Namespace) -> argparse.Namespace:
    if not args.config:
        return args
    cfg = _load_json(Path(args.config))
    for k, v in cfg.items():
        if hasattr(args, k):
            current = getattr(args, k)
            if current in (None, [], ""):
                setattr(args, k, v)
    return args


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="EasyCert - 基于 ACME DNS-01 的 SSL 证书申请工具")
    parser.add_argument("--config", help="JSON 配置文件路径")
    parser.add_argument("--domains", nargs="+", help="域名列表,可空格分隔,支持 *.example.com")
    parser.add_argument("--email", help="ACME 账号邮箱")
    parser.add_argument("--provider", choices=["cloudflare", "aliyun", "dnspod"], help="DNS 服务商")

    parser.add_argument("--cloudflare-token", help="Cloudflare API Token")
    parser.add_argument("--aliyun-access-key-id", help="Aliyun AccessKey ID")
    parser.add_argument("--aliyun-access-key-secret", help="Aliyun AccessKey Secret")
    parser.add_argument("--dnspod-secret-id", help="DNSPod SecretId")
    parser.add_argument("--dnspod-secret-key", help="DNSPod SecretKey")

    parser.add_argument("--key-type", choices=["rsa", "ecc"], default="rsa")
    parser.add_argument("--rsa-bits", type=int, default=2048)
    parser.add_argument("--ec-curve", choices=["secp256r1", "secp384r1"], default="secp256r1")
    parser.add_argument("--dns-propagation-seconds", type=int, default=45)
    parser.add_argument("--output-dir", default="./cert-output")
    parser.add_argument("--account-key", default="./acme-account/account.key")
    parser.add_argument("--server-url", help="自定义 ACME Directory URL")
    parser.add_argument("--staging", action="store_true", default=True, help="使用 Let's Encrypt Staging(默认)")
    parser.add_argument("--production", action="store_true", help="使用 Let's Encrypt Production")
    parser.add_argument("--force", action="store_true", help="忽略生产环境频率风险提示")
    return parser


def validate_required_args(args: argparse.Namespace) -> None:
    if args.production:
        args.staging = False
    if not args.email:
        raise ValueError("缺少参数: --email")
    if not args.provider:
        raise ValueError("缺少参数: --provider")
    if not args.domains:
        raise ValueError("缺少参数: --domains")


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    args = _load_config_args(args)
    validate_required_args(args)

    domains = _normalize_domains(args.domains)
    _rate_limit_guard(domains, args.staging, args.force)
    args.domains = domains

    print("=" * 56)
    print(" EasyCert 2.0 - SSL/TLS 证书生命周期工具 (ACME DNS-01) ")
    print("=" * 56)
    print(f"[INFO] 模式: {'STAGING' if args.staging else 'PRODUCTION'}")
    print(f"[INFO] 申请域名: {', '.join(domains)}")
    print(f"[INFO] DNS Provider: {args.provider}")

    result = issue_certificate(args)
    output_dir = Path(args.output_dir).resolve() / domains[0].replace("*.", "wildcard.")
    output_paths = _write_outputs(output_dir, result)

    info = _extract_cert_info(result.fullchain_pem)
    snippets = _https_snippet(domains[0].replace("*.", ""))

    print("\n" + "★" * 26)
    print("证书申请成功")
    print("★" * 26)
    print(f"[输出] privkey.key  : {output_paths['privkey']}")
    print(f"[输出] fullchain.pem: {output_paths['fullchain']}")
    print(f"[输出] cert.pem     : {output_paths['cert']}")
    print(f"[输出] chain.pem    : {output_paths['chain']}")

    print("\n[证书信息]")
    print(f"- 认证域名: {info['domains']}")
    print(f"- 证书品牌/中间证书: Let's Encrypt {info['chain_brand']}")
    if info["chain_brand"] != "R13":
        print("  [WARN] 当前链并非 R13,请根据业务兼容性自行评估。")
    print(f"- 到期时间(UTC): {info['expire_at']}")
    print(f"- 剩余天数: {info['days_left']} 天")

    print("\n[Apache .htaccess - 强制 HTTPS]")
    print(snippets["apache"])

    print("[Nginx - 强制 HTTPS]")
    print(snippets["nginx"])


if __name__ == "__main__":
    try:
        main()
    except Exception as ex:
        print(f"[ERROR] {ex}")
        raise

README.md文档说明:

# EasyCert 2.0

一个可独立运行的 ACME DNS-01 证书申请工具,适用于虚拟主机(不便自动化部署)场景。

## 功能概览

- 多域名申请:支持单域名、多域名、子域名、通配符(`*.example.com`)
- 输出标准文件:
  - `privkey.key`(RSA 或 ECC)
  - `fullchain.pem`(完整证书链)
  - 同时输出 `cert.pem`、`chain.pem`
- 申请后自动展示:
  - 实际认证域名(SAN)
  - 中间证书品牌(例如 Let’s Encrypt R12/R13)
  - 到期时间与剩余天数
- 自动输出强制 HTTPS 跳转片段:
  - Apache `.htaccess`
  - Nginx `server` 块
- 默认 `staging` 预演,降低 Rate Limit 风险
- DNS-01 验证:已集成
  - Cloudflare
  - 阿里云 DNS(AliDNS)
  - DNSPod(腾讯云)

---

## 工具架构

1. **CLI 层(argparse)**
   - 参数解析
   - 配置文件合并(JSON)
   - 生产/测试模式切换

2. **DNS Provider 抽象层**
   - `DNSProvider` 抽象基类
   - `CloudflareDNSProvider`
   - `AliyunDNSProvider`
   - `DNSPodDNSProvider`

3. **ACME 执行层**
   - 创建/复用 ACME 账号密钥
   - 创建订单并完成 DNS-01 challenge
   - 轮询签发并拉取证书链

4. **证书与输出层**
   - 生成 `privkey.key`
   - 写出 `cert.pem` / `chain.pem` / `fullchain.pem`
   - 解析 SAN、过期时间、剩余天数
   - 生成 Apache/Nginx HTTPS 强制跳转模板

5. **频率限制守卫(软限制)**
   - 本地记录近 7 天生产申请
   - 高风险时提醒(可 `--force` 跳过)

---

## 安装

```bash
pip install -r requirements.txt
```

---

## 运行模式说明(staging 与 production)

- **默认是 staging**(测试环境,不消耗正式额度):

```bash
python easycert.py --config config.json
```

- **切换正式环境 production**(签发可用于线上):

```bash
python easycert.py --config config.json --production
```

建议流程:

1. 先 staging 预演;
2. 验证通过后再 production。

---

## 各 DNS 平台 API 凭据获取与配置(重点)

> 原则:**看 DNS 托管方,不看域名注册商**。  
> 例如域名在阿里云注册,但 NS 托管在 Cloudflare,就应使用 Cloudflare 凭据。

### 1) Cloudflare(默认 staging 示例)

#### 1.1 前提

- 域名 Zone 已添加到 Cloudflare。
- 域名 NS 已切换到 Cloudflare。

#### 1.2 获取 API Token(推荐方式)

1. Cloudflare 控制台 → **My Profile** → **API Tokens**。
2. 创建 Token(建议从模板编辑):
   - 权限至少包含:
     - `Zone:Read`
     - `DNS:Edit`
   - Zone Resources 选择目标域名(如 `baimahao.com`)。
3. 生成后复制 Token(只显示一次)。

#### 1.3 命令示例

```bash
python easycert.py \
  --domains example.com www.example.com *.example.com \
  --email admin@example.com \
  --provider cloudflare \
  --cloudflare-token <CF_API_TOKEN>
```

#### 1.4 生产证书

```bash
python easycert.py \
  --domains example.com www.example.com *.example.com \
  --email admin@example.com \
  --provider cloudflare \
  --cloudflare-token <CF_API_TOKEN> \
  --production
```

---

### 2) 阿里云 DNS(AliDNS)

#### 2.1 前提

- 域名 DNS 托管在阿里云云解析 DNS。

#### 2.2 获取 AccessKey(推荐 RAM 子账号,不用主账号)

1. 阿里云控制台 → **RAM 访问控制**。
2. 创建 RAM 用户(编程访问)。
3. 为该 RAM 用户创建 `AccessKeyId` / `AccessKeySecret`。
4. 给 RAM 用户授权策略(至少):
   - `alidns:AddDomainRecord`
   - `alidns:DeleteDomainRecord`
   - `alidns:DescribeDomainRecords`

> 若报 `Forbidden.RAM`,说明权限未生效或未授权到该子账号。

#### 2.3 命令示例

```bash
python easycert.py \
  --domains example.com *.example.com \
  --email admin@example.com \
  --provider aliyun \
  --aliyun-access-key-id <AK> \
  --aliyun-access-key-secret <SK>
```

#### 2.4 注意

- 当前实现中 AliDNS TXT 记录 TTL 使用 `600`(满足阿里云 600~86400 要求)。

---

### 3) DNSPod(腾讯云)

#### 3.1 前提

- 域名 DNS 托管在 DNSPod(腾讯云)。

#### 3.2 获取 SecretId / SecretKey

1. 腾讯云控制台 → **访问管理 CAM** → **API 密钥管理**。
2. 创建/查看 API 密钥,获取 `SecretId` 与 `SecretKey`。
3. 给对应子账号授予 DNSPod 记录读写权限(至少能增删解析记录)。

#### 3.3 命令示例

```bash
python easycert.py \
  --domains example.com *.example.com \
  --email admin@example.com \
  --provider dnspod \
  --dnspod-secret-id <SID> \
  --dnspod-secret-key <SKEY>
```

---

### 4) 腾讯云 DNS(说明)

- 本工具当前内置的是 **DNSPod API 接入**(`--provider dnspod`)。
- 若你的域名解析在腾讯云 DNSPod,直接使用上面的 DNSPod 配置即可。

---

### 5) 百度云 DNS(说明)

- **当前版本未内置百度云 DNS Provider**,因此不能直接使用百度云 API 凭据。
- 可选方案:
  1. 将 DNS 托管切换到已支持平台(Cloudflare / 阿里云 DNS / DNSPod);
  2. 按现有 `DNSProvider` 抽象新增 `BaiduDNSProvider` 后再使用。

---

## 配置文件方式(JSON)

### Cloudflare 示例

```json
{
  "domains": ["example.com", "www.example.com", "*.example.com"],
  "email": "admin@example.com",
  "provider": "cloudflare",
  "cloudflare_token": "<CF_API_TOKEN>",
  "key_type": "rsa",
  "rsa_bits": 2048,
  "dns_propagation_seconds": 45,
  "output_dir": "./cert-output"
}
```

### 阿里云 DNS 示例

```json
{
  "domains": ["example.com", "www.example.com"],
  "email": "admin@example.com",
  "provider": "aliyun",
  "aliyun_access_key_id": "<AK>",
  "aliyun_access_key_secret": "<SK>",
  "dns_propagation_seconds": 45,
  "output_dir": "./cert-output"
}
```

### DNSPod(腾讯云)示例

```json
{
  "domains": ["example.com", "www.example.com"],
  "email": "admin@example.com",
  "provider": "dnspod",
  "dnspod_secret_id": "<SID>",
  "dnspod_secret_key": "<SKEY>",
  "dns_propagation_seconds": 45,
  "output_dir": "./cert-output"
}
```

运行:

```bash
python easycert.py --config config.json
```

生产证书:

```bash
python easycert.py --config config.json --production
```

---

## 输出目录

默认输出到:

- `./cert-output/<主域名>/privkey.key`
- `./cert-output/<主域名>/fullchain.pem`
- `./cert-output/<主域名>/cert.pem`
- `./cert-output/<主域名>/chain.pem`

---

## Let's Encrypt 限速建议

- 单证书最多 100 个域名
- 每个注册域名每 7 天最多 50 张证书
- 重复证书每 7 天最多 5 张

---

## 注意事项

- 通配符证书必须使用 DNS-01。
- API 凭据必须有“新增/删除 TXT 记录”权限。
- 某些 DNS 平台传播慢,可调大 `--dns-propagation-seconds`。
- 中间证书链可能是 R12 或 R13,均可能由 Let’s Encrypt 按策略下发。
- 一旦 API Token/AK/SK 泄露,请立即吊销并重建。

2.1 系统要求

  • Python 3.8+

  • 能访问公网的计算机(本地或云主机均可)

  • 域名DNS托管在支持的平台上

2.2 安装依赖

pip install acme>=2.11.0 josepy>=1.14.0 cryptography>=43.0.0 requests>=2.32.0 dnspython>=2.6.1

或直接使用项目提供的requirements.txt:

requirements

pip install -r requirements.txt

三、DNS服务商配置完整指南

重要原则:看DNS托管方,不看域名注册商。
例如域名在阿里云注册,但NS托管在Cloudflare,就应使用Cloudflare凭据。

本工具目前内置支持:Cloudflare阿里云DNSDNSPod(腾讯云)

以下分别说明各平台的完整配置流程,包括:①凭证获取 → ②config.json配置 → ③运行命令。


3.1 Cloudflare(国际用户首选)

适用场景:域名NS托管在Cloudflare,或希望使用全球CDN加速DNS解析。

前提条件:域名Zone已添加到Cloudflare,且NS已切换到Cloudflare。

第一步:获取API Token

  1. 登录Cloudflare控制台 → My ProfileAPI Tokens

  2. 点击"Create Token",建议从"Edit zone DNS"模板开始编辑

  3. 权限配置:

    • Zone:Read(读取Zone信息)

    • DNS:Edit(编辑DNS记录)

  4. Zone Resources选择"Include → Specific zone → 您的域名"

  5. 生成后复制Token(仅显示一次

第二步:创建config.json

{
  "domains": ["example.com", "www.example.com", "*.example.com"],
  "email": "admin@example.com",
  "provider": "cloudflare",
  "cloudflare_token": "YOUR_CF_API_TOKEN",
  "key_type": "rsa",
  "rsa_bits": 2048,
  "dns_propagation_seconds": 60,
  "output_dir": "./cert-output"
}

第三步:运行脚本

测试模式(Staging,不消耗正式额度):

python easycert.py --config config.json

生产模式(正式证书):

python easycert.py --config config.json --production

命令行方式(不使用配置文件):

python easycert.py \
  --domains example.com www.example.com *.example.com \
  --email admin@example.com \
  --provider cloudflare \
  --cloudflare-token YOUR_CF_API_TOKEN \
  --production

3.2 阿里云云解析DNS(国内用户主流选择)

适用场景:域名DNS托管在阿里云云解析,适合国内业务优化解析线路。

安全原则:必须使用RAM子账号,严禁使用主账号AccessKey。

第一步:创建RAM子账号并生成密钥

  1. 登录阿里云控制台,在顶部搜索栏输入 RAM 并进入"访问控制"控制台

  2. 左侧菜单栏选择 身份管理用户

  3. 点击 创建用户

    • 登录名称EasyCert-User(或其他易识别名称)

    • 显示名称:同上

    • 访问方式务必勾选"OpenAPI 调用访问",这样才会生成AccessKey IDAccessKey Secret

  4. 点击确定后,页面会显示生成的密钥。立即保存AccessKey IDAccessKey Secret,Secret只会显示这一次

第二步:创建最小权限策略

  1. 左侧菜单选择 权限管理策略

  2. 点击 创建策略,选择 脚本编辑,粘贴以下JSON代码:

{
    "Version": "1",
    "Statement": [
        {
            "Action": [
                "alidns:AddDomainRecord",
                "alidns:DeleteDomainRecord",
                "alidns:DescribeDomainRecords",
                "alidns:DescribeDomains"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

说明DescribeDomains用于脚本自动定位域名Zone信息,建议保留以确保兼容性。

  1. 点击 下一步策略名称填写EasyCertDNSPolicy,点击确定保存

第三步:为子账号授权

  1. 回到 身份管理用户 列表

  2. 找到刚才创建的EasyCert-User,点击右侧 添加权限

  3. 在"选择策略"页面:

    • 授权范围:默认"云账号全部资源"即可

    • 选择 自定义策略

    • 搜索并勾选EasyCertDNSPolicy

  4. 点击 确定 完成授权

权限通常即时生效,无需等待。

第四步:创建config.json

{
  "domains": ["example.com", "*.example.com"],
  "email": "admin@example.com",
  "provider": "aliyun",
  "aliyun_access_key_id": "YOUR_ACCESS_KEY_ID",
  "aliyun_access_key_secret": "YOUR_ACCESS_KEY_SECRET",
  "key_type": "rsa",
  "rsa_bits": 2048,
  "dns_propagation_seconds": 60,
  "output_dir": "./cert-output"
}

第五步:运行脚本

测试模式:

python easycert.py --config config.json

观察输出,确认看到[OK] DNS TXT 已可解析且最终提示证书申请成功,说明配置完全正确。

生产模式:

python easycert.py --config config.json --production

命令行方式:

python easycert.py \
  --domains example.com *.example.com \
  --email admin@example.com \
  --provider aliyun \
  --aliyun-access-key-id YOUR_AK_ID \
  --aliyun-access-key-secret YOUR_AK_SECRET \
  --production

常见问题排查

问题现象排查方向
Forbidden.RAM或权限不足报错检查策略是否包含DescribeDomains;确认AccessKey属于该子账号而非主账号
DNS验证超时增加dns_propagation_seconds至120秒;阿里云DNS要求TXT记录TTL为600秒,工具已自动适配
InvalidAccessKeyId.NotFoundAccessKey ID输入错误,或使用了已删除的密钥

3.3 DNSPod(腾讯云)

适用场景:域名DNS托管在DNSPod(腾讯云),国内解析速度快,API稳定性高。

第一步:获取SecretId/SecretKey

方式一:通过DNSPod控制台(推荐,权限更精准)

  1. 登录 DNSPod控制台

  2. 进入用户中心API密钥(或访问 https://console.dnspod.cn/account/token)

  3. 点击"创建密钥",填写描述(如"Let's Encrypt证书申请")

  4. 选择权限范围:建议仅勾选域名解析相关权限

  5. 复制ID(即SecretId)和Token(即SecretKey)

方式二:通过腾讯云CAM(适合企业子账号管理)

  1. 登录 腾讯云控制台

  2. 进入访问管理API密钥管理

  3. 点击"新建密钥",复制SecretIdSecretKey

  4. 确保账号拥有DNSPod权限:QcloudDNSPodFullAccess或自定义策略

第二步:创建config.json

{
  "domains": ["example.com", "*.example.com"],
  "email": "admin@example.com",
  "provider": "dnspod",
  "dnspod_secret_id": "YOUR_SECRET_ID",
  "dnspod_secret_key": "YOUR_SECRET_KEY",
  "key_type": "rsa",
  "rsa_bits": 2048,
  "dns_propagation_seconds": 60,
  "output_dir": "./cert-output"
}

第三步:运行脚本

测试模式:

python easycert.py --config config.json

生产模式:

python easycert.py --config config.json --production

命令行方式:

python easycert.py \
  --domains example.com *.example.com \
  --email admin@example.com \
  --provider dnspod \
  --dnspod-secret-id YOUR_SID \
  --dnspod-secret-key YOUR_SKEY \
  --production

四、其他DNS服务商说明

4.1 华为云、百度云、京东云

服务商支持状态说明
华为云需自行扩展提供DNS API,需按DNSProvider抽象类实现HuaweiDNSProvider
百度云需自行扩展当前版本未内置,建议将DNS托管至已支持平台
京东云需自行扩展当前版本未内置,建议将DNS托管至已支持平台

建议方案:若您的域名DNS托管在华为云、百度云或京东云,最快捷的方式是将NS记录临时切换至Cloudflare、阿里云DNS或DNSPod,完成证书申请后再切回(注意:切换DNS服务商需等待24-48小时全球生效,请谨慎操作)。


五、Let's Encrypt速率限制与最佳实践

5.1 核心限制

根据Let's Encrypt官方文档:

限制类型具体规则重置周期
单证书域名数最多100个标识符-
每注册域名每7天最多50张证书7天滑动窗口
重复证书相同域名组合每7天最多5张7天滑动窗口
新订单单个账户每3小时最多300个3小时

5.2 最佳实践

  1. Staging先行:任何新域名或流程变更前,必须先在Staging环境完成端到端验证

  2. 合并域名:将多个子域名合并至单张证书,减少总量消耗

  3. 本地记录:工具会自动记录生产环境申请历史,近7天申请超过45次时发出警告(可--force跳过)


六、证书输出与部署

6.1 输出文件说明

运行成功后,证书文件输出至./cert-output/<主域名>/目录:

文件名用途部署位置
privkey.key私钥服务器SSL配置中的Key文件
cert.pem服务器证书(仅叶子证书)部分面板要求单独上传
chain.pem中间证书链部分面板要求单独上传
fullchain.pem完整证书链(cert+chain)大多数Web服务器直接使用

6.2 虚拟主机部署步骤

  1. 登录虚拟主机控制面板(如cPanel、Plesk、宝塔等)

  2. 进入SSL/TLS管理界面

  3. 上传证书文件

    • 证书(CRT/Certificate):粘贴fullchain.pem内容,或分别粘贴cert.pem+chain.pem

    • 私钥(Key):粘贴privkey.key内容

  4. 启用HTTPS强制跳转(可选):

工具会自动输出Apache和Nginx的强制跳转配置片段,例如:

Apache .htaccess:

RewriteEngine On
RewriteCond %{HTTPS} !=on
RewriteRule ^ https://%{HTTP_HOST}%{REQUEST_URI} [L,R=301]

Nginx:

server {
    listen 80;
    server_name example.com;
    return 301 https://$host$request_uri;
}

七、安全建议

  1. 凭证管理:将API Token/AccessKey存储于配置文件并设置严格文件权限(chmod 600 config.json),避免硬编码于脚本

  2. 子账号原则:所有平台均应创建专用子账号,授予最小必要权限,禁用不需要的API访问

  3. 账户密钥:工具自动生成的ACME账户密钥(./acme-account/account.key)需妥善备份,丢失将导致无法续期

  4. 密钥轮换:定期(建议每90天)更换DNS API凭证,降低泄露风险

  5. 泄露应急:如怀疑密钥泄露,立即在对应云平台禁用该密钥,并重新生成配置


八、完整配置示例汇总

8.1 多域名混合配置(Cloudflare)

{
  "domains": ["example.com", "www.example.com", "*.example.com", "api.example.com"],
  "email": "admin@example.com",
  "provider": "cloudflare",
  "cloudflare_token": "YOUR_CF_API_TOKEN",
  "key_type": "ecc",
  "ec_curve": "secp256r1",
  "dns_propagation_seconds": 60,
  "output_dir": "./cert-output"
}

8.2 国内业务优化(阿里云)

{
  "domains": ["example.cn", "*.example.cn"],
  "email": "admin@example.cn",
  "provider": "aliyun",
  "aliyun_access_key_id": "YOUR_AK_ID",
  "aliyun_access_key_secret": "YOUR_AK_SECRET",
  "key_type": "rsa",
  "rsa_bits": 2048,
  "dns_propagation_seconds": 60,
  "output_dir": "/opt/certs"
}

8.3 腾讯云生态(DNSPod)

{
  "domains": ["example.com", "*.example.com"],
  "email": "admin@example.com",
  "provider": "dnspod",
  "dnspod_secret_id": "YOUR_SID",
  "dnspod_secret_key": "YOUR_SKEY",
  "dns_propagation_seconds": 60,
  "output_dir": "./cert-output"
}

九、总结

对于虚拟主机用户,通过Python实现的ACME DNS-01客户端是获取Let's Encrypt证书的可行路径。本工具已内置Cloudflare、阿里云、腾讯云DNSPod三大主流服务商支持,覆盖国内外大多数用户场景。

标准操作流程回顾

  1. 确认域名DNS托管方

  2. 按第三章对应平台教程获取API凭证

  3. 创建config.json配置文件

  4. 先Staging测试,验证通过后Production正式申请

  5. 将输出证书部署至虚拟主机面板

  6. 设置到期提醒,每90天重复申请流程

严格遵循"Staging先行、最小权限、密钥轮换"的安全原则,即可在虚拟主机环境中稳定、安全地获取免费SSL证书。