python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '16/3/4'

# !/usr/bin/env python
import ftplib
import hmac
import subprocess, json, base64, binascii, time, hashlib, re, copy, textwrap

import datetime
import requests
from cStringIO import StringIO

CA = "https://acme-v01.api.letsencrypt.org"

KEY = '-----'
SECRET = '------'
BUCKET = 'ficapy'
url = 'ficapy.oss-cn-hangzhou.aliyuncs.com'

FTP_URL = '-------'
FTP_USER = '-------'
FTP_PWD = '-------'

DOMAIN_MAP = {
    'ficapy.com': 'oss_verify',
    'www.ficapy.com': 'oss_verify',
    # 'zoulei.net': 'ftp_verify',
    # 'www.zoulei.net': 'ftp_verify',
}

s = requests.Session()


def oss_verify(file_path, data):
    time = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    headers = {
        'Date': time,
        'Content-Type': 'text/html',
        'HOST': url,
        'Authorization': 'OSS ' + KEY + ':' + base64.b64encode(hmac.new(SECRET,
                                                                        'PUT' + "\n"
                                                                        + '' + "\n"
                                                                        + 'text/html' + "\n"
                                                                        + time + "\n"
                                                                        + '/' + BUCKET + '/' + file_path,
                                                                        hashlib.sha1).digest()).strip()
    }
    s.put('http://' + url + '/' + file_path, headers=headers, data=data).raise_for_status()


def ftp_verify(file_path, data):
    ftp = ftplib.FTP(FTP_URL)
    ftp.login(FTP_USER, FTP_PWD)
    _dir = file_path.split('/')[:-1]
    file_name = file_path.split('/')[-1]
    ftp.cwd('/')
    for i in _dir:
        if i in ftp.nlst():
            pass
        else:
            ftp.mkd(i)
        ftp.cwd(i)
    f = StringIO(data)
    ftp.storbinary('STOR ' + file_name, f)
    ftp.close()


def get_crt(account_key, csr):
    # helper function base64 encode for jose spec
    def _b64(b):
        return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")

    # parse account key to get public key
    proc = subprocess.Popen(["openssl", "rsa", "-in", account_key, "-noout", "-text"],
                            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    pub_hex, pub_exp = re.search(
        r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
        out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
    pub_exp = "{0:x}".format(int(pub_exp))
    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
    header = {
        "alg": "RS256",
        "jwk": {
            "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
            "kty": "RSA",
            "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
        },
    }
    accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
    thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())

    # helper function make signed requests
    def _send_signed_request(url, payload):
        payload64 = _b64(json.dumps(payload).encode('utf8'))
        protected = copy.deepcopy(header)
        protected["nonce"] = s.get(CA + "/directory").headers['Replay-Nonce']
        protected64 = _b64(json.dumps(protected).encode('utf8'))
        proc = subprocess.Popen(["openssl", "dgst", "-sha256", "-sign", account_key],
                                stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
        if proc.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        data = json.dumps({
            "header": header, "protected": protected64,
            "payload": payload64, "signature": _b64(out),
        })
        resp = s.post(url, data.encode('utf8'))
        return resp.status_code, resp.content

    # find domains
    print("Parsing CSR...")
    proc = subprocess.Popen(["openssl", "req", "-in", csr, "-noout", "-text"],
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    if proc.returncode != 0:
        raise IOError("Error loading {0}: {1}".format(csr, err))
    domains = set([])
    common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
    if common_name is not None:
        domains.add(common_name.group(1))
    subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'),
                                  re.MULTILINE | re.DOTALL)
    if subject_alt_names is not None:
        for san in subject_alt_names.group(1).split(", "):
            if san.startswith("DNS:"):
                domains.add(san[4:])

    # get the certificate domains and expiration
    print("Registering account...")
    code, result = _send_signed_request(CA + "/acme/new-reg", {
        "resource": "new-reg",
        "agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf",
    })
    if code == 201:
        print ("Registered!")
    elif code == 409:
        print ("Already registered!")
    else:
        raise ValueError("Error registering: {0} {1}".format(code, result))
    return

    # verify each domain
    for domain in domains:
        print ("Verifying {0}...".format(domain))

        # get new challenge
        code, result = _send_signed_request(CA + "/acme/new-authz", {
            "resource": "new-authz",
            "identifier": {"type": "dns", "value": domain},
        })
        if code != 201:
            raise ValueError("Error requesting challenges: {0} {1}".format(code, result))

        # make the challenge file
        challenge = [c for c in json.loads(result)['challenges'] if c['type'] == "http-01"][0]
        token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
        keyauthorization = "{0}.{1}".format(token, thumbprint)
        # 上传到阿里云OSS(不考虑本地)
        globals()[DOMAIN_MAP[domain]]('.well-known/acme-challenge/{}'.format(token), keyauthorization)

        # check that the file is in place
        wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
        try:
            resp = s.get(wellknown_url)
            resp_data = resp.content.strip()
            assert resp_data == keyauthorization
        except (IOError, AssertionError):
            raise ValueError("couldn't download {0}".format(wellknown_url))

        # notify challenge are met
        code, result = _send_signed_request(challenge['uri'], {
            "resource": "challenge",
            "keyAuthorization": keyauthorization,
        })
        if code != 202:
            raise ValueError("Error triggering challenge: {0} {1}".format(code, result))

        # wait for challenge to be verified
        while True:
            try:
                challenge_status = s.get(challenge['uri']).json()
            except IOError as e:
                raise ValueError("Error checking challenge: {0} {1}".format(
                    e.code, e.text))
            if challenge_status['status'] == "pending":
                time.sleep(2)
            elif challenge_status['status'] == "valid":
                print("{0} verified!".format(domain))
                break
            else:
                raise ValueError("{0} challenge did not pass: {1}".format(
                    domain, challenge_status))

    # get the new certificate
    print("Signing certificate...")
    proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    csr_der, err = proc.communicate()
    code, result = _send_signed_request(CA + "/acme/new-cert", {
        "resource": "new-cert",
        "csr": _b64(csr_der),
    })
    if code != 201:
        raise ValueError("Error signing certificate: {0} {1}".format(code, result))

    # return signed certificate!
    print("Certificate signed!")
    return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
        "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))


if __name__ == "__main__":  # pragma: no cover
    print(get_crt('account.key', 'domain.csr'))

以上是关于python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi的主要内容,如果未能解决你的问题,请参考以下文章

ini Python日志配置(到控制台和文件,改编自python-guide.org)

python 将数据从提供的位置备份到另一个位置的压缩文件。解决方案改编自使用Python自动化无聊的东西。

python 将数据从提供的位置备份到另一个位置的压缩文件。解决方案改编自使用Python自动化无聊的东西。

python 移动pytorch的mnist数据集,改编自https://gist.github.com/tencia/afb129122a64bde3bd0c

python 修改自https://github.com/oostendo/python-zxing用于跨平台。

diagrammeR - 改编自“如何在我的流程图中添加武器?”