python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi相关的知识,希望对你有一定的参考价值。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '16/3/4'
# !/usr/bin/env python
import ftplib
import hmac
import subprocess, json, base64, binascii, time, hashlib, re, copy, textwrap
import datetime
import requests
from cStringIO import StringIO
CA = "https://acme-v01.api.letsencrypt.org"
KEY = '-----'
SECRET = '------'
BUCKET = 'ficapy'
url = 'ficapy.oss-cn-hangzhou.aliyuncs.com'
FTP_URL = '-------'
FTP_USER = '-------'
FTP_PWD = '-------'
DOMAIN_MAP = {
'ficapy.com': 'oss_verify',
'www.ficapy.com': 'oss_verify',
# 'zoulei.net': 'ftp_verify',
# 'www.zoulei.net': 'ftp_verify',
}
s = requests.Session()
def oss_verify(file_path, data):
time = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
headers = {
'Date': time,
'Content-Type': 'text/html',
'HOST': url,
'Authorization': 'OSS ' + KEY + ':' + base64.b64encode(hmac.new(SECRET,
'PUT' + "\n"
+ '' + "\n"
+ 'text/html' + "\n"
+ time + "\n"
+ '/' + BUCKET + '/' + file_path,
hashlib.sha1).digest()).strip()
}
s.put('http://' + url + '/' + file_path, headers=headers, data=data).raise_for_status()
def ftp_verify(file_path, data):
ftp = ftplib.FTP(FTP_URL)
ftp.login(FTP_USER, FTP_PWD)
_dir = file_path.split('/')[:-1]
file_name = file_path.split('/')[-1]
ftp.cwd('/')
for i in _dir:
if i in ftp.nlst():
pass
else:
ftp.mkd(i)
ftp.cwd(i)
f = StringIO(data)
ftp.storbinary('STOR ' + file_name, f)
ftp.close()
def get_crt(account_key, csr):
# helper function base64 encode for jose spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# parse account key to get public key
proc = subprocess.Popen(["openssl", "rsa", "-in", account_key, "-noout", "-text"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
}
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
# helper function make signed requests
def _send_signed_request(url, payload):
payload64 = _b64(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = s.get(CA + "/directory").headers['Replay-Nonce']
protected64 = _b64(json.dumps(protected).encode('utf8'))
proc = subprocess.Popen(["openssl", "dgst", "-sha256", "-sign", account_key],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": _b64(out),
})
resp = s.post(url, data.encode('utf8'))
return resp.status_code, resp.content
# find domains
print("Parsing CSR...")
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-noout", "-text"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if proc.returncode != 0:
raise IOError("Error loading {0}: {1}".format(csr, err))
domains = set([])
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
if common_name is not None:
domains.add(common_name.group(1))
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'),
re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
# get the certificate domains and expiration
print("Registering account...")
code, result = _send_signed_request(CA + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf",
})
if code == 201:
print ("Registered!")
elif code == 409:
print ("Already registered!")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
return
# verify each domain
for domain in domains:
print ("Verifying {0}...".format(domain))
# get new challenge
code, result = _send_signed_request(CA + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make the challenge file
challenge = [c for c in json.loads(result)['challenges'] if c['type'] == "http-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, thumbprint)
# 上传到阿里云OSS(不考虑本地)
globals()[DOMAIN_MAP[domain]]('.well-known/acme-challenge/{}'.format(token), keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = s.get(wellknown_url)
resp_data = resp.content.strip()
assert resp_data == keyauthorization
except (IOError, AssertionError):
raise ValueError("couldn't download {0}".format(wellknown_url))
# notify challenge are met
code, result = _send_signed_request(challenge['uri'], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
try:
challenge_status = s.get(challenge['uri']).json()
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, e.text))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
print("{0} verified!".format(domain))
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
# get the new certificate
print("Signing certificate...")
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
csr_der, err = proc.communicate()
code, result = _send_signed_request(CA + "/acme/new-cert", {
"resource": "new-cert",
"csr": _b64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
print("Certificate signed!")
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
if __name__ == "__main__": # pragma: no cover
print(get_crt('account.key', 'domain.csr'))
以上是关于python 改编自https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py,FTP文件上传或者阿里云OSS上传.https://fi的主要内容,如果未能解决你的问题,请参考以下文章
ini Python日志配置(到控制台和文件,改编自python-guide.org)
python 将数据从提供的位置备份到另一个位置的压缩文件。解决方案改编自使用Python自动化无聊的东西。
python 将数据从提供的位置备份到另一个位置的压缩文件。解决方案改编自使用Python自动化无聊的东西。
python 移动pytorch的mnist数据集,改编自https://gist.github.com/tencia/afb129122a64bde3bd0c