如何在 Python 中运行 Google Cloud Function 中的子进程
Posted
技术标签:
【中文标题】如何在 Python 中运行 Google Cloud Function 中的子进程【英文标题】:How to run a subprocess inside Google Cloud Function in Python 【发布时间】:2019-07-23 02:25:13 【问题描述】:我试图在 GCP 函数中运行一个 bash 脚本,但不知何故它不起作用。 这是我的函数,它基本上将文件(代理)导出到 Google Apigee:
def test2(request):
cmd = "python ./my-proxy/tools/deploy.py -n myProxy -u userName:!password -o myOrg -e test -d ./my-proxy -p /"
# no block, it start a sub process.
p = subprocess.Popen(cmd , shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# and you can block util the cmd execute finish
p.wait()
# or stdout, stderr = p.communicate()
return "Proxy deployed to Apigee"
这是我的deploy.py
文件的样子:
!/usr/bin/env python
import base64
import getopt
import httplib
import json
import re
import os
import sys
import StringIO
import urlparse
import xml.dom.minidom
import zipfile
def httpCall(verb, uri, headers, body):
if httpScheme == 'https':
conn = httplib.HTTPSConnection(httpHost)
else:
conn = httplib.HTTPConnection(httpHost)
if headers == None:
hdrs = dict()
else:
hdrs = headers
hdrs['Authorization'] = 'Basic %s' % base64.b64encode(UserPW)
conn.request(verb, uri, body, hdrs)
return conn.getresponse()
def getElementText(n):
c = n.firstChild
str = StringIO.StringIO()
while c != None:
if c.nodeType == xml.dom.Node.TEXT_NODE:
str.write(c.data)
c = c.nextSibling
return str.getvalue().strip()
def getElementVal(n, name):
c = n.firstChild
while c != None:
if c.nodeName == name:
return getElementText(c)
c = c.nextSibling
return None
# Return TRUE if any component of the file path contains a directory name that
# starts with a "." like '.svn', but not '.' or '..'
def pathContainsDot(p):
c = re.compile('\.\w+')
for pc in p.split('/'):
if c.match(pc) != None:
return True
return False
def getDeployments():
# Print info on deployments
hdrs = 'Accept': 'application/xml'
resp = httpCall('GET',
'/v1/organizations/%s/apis/%s/deployments' \
% (Organization, Name),
hdrs, None)
if resp.status != 200:
return None
ret = list()
deployments = xml.dom.minidom.parse(resp)
environments = deployments.getElementsByTagName('Environment')
for env in environments:
envName = env.getAttribute('name')
revisions = env.getElementsByTagName('Revision')
for rev in revisions:
revNum = int(rev.getAttribute('name'))
error = None
state = getElementVal(rev, 'State')
basePaths = rev.getElementsByTagName('BasePath')
if len(basePaths) > 0:
basePath = getElementText(basePaths[0])
else:
basePath = 'unknown'
# svrs = rev.getElementsByTagName('Server')
status = 'environment': envName,
'revision': revNum,
'basePath': basePath,
'state': state
if error != None:
status['error'] = error
ret.append(status)
return ret
def printDeployments(dep):
for d in dep:
print 'Environment: %s' % d['environment']
print ' Revision: %i BasePath = %s' % (d['revision'], d['basePath'])
print ' State: %s' % d['state']
if 'error' in d:
print ' Error: %s' % d['error']
ApigeeHost = 'https://api.enterprise.apigee.com'
UserPW = None
Directory = None
Organization = None
Environment = None
Name = None
BasePath = '/'
ShouldDeploy = True
Options = 'h:u:d:e:n:p:o:i:z:'
opts = getopt.getopt(sys.argv[1:], Options)[0]
for o in opts:
if o[0] == '-n':
Name = o[1]
elif o[0] == '-o':
Organization = o[1]
elif o[0] == '-h':
ApigeeHost = o[1]
elif o[0] == '-d':
Directory = o[1]
elif o[0] == '-e':
Environment = o[1]
elif o[0] == '-p':
BasePath = o[1]
elif o[0] == '-u':
UserPW = o[1]
elif o[0] == '-i':
ShouldDeploy = False
elif o[0] == '-z':
ZipFile = o[1]
if UserPW == None or \
(Directory == None and ZipFile == None) or \
Environment == None or \
Name == None or \
Organization == None:
print """Usage: deploy -n [name] (-d [directory name] | -z [zipfile])
-e [environment] -u [username:password] -o [organization]
[-p [base path] -h [apigee API url] -i]
base path defaults to "/"
Apigee URL defaults to "https://api.enterprise.apigee.com"
-i denotes to import only and not actually deploy
"""
sys.exit(1)
url = urlparse.urlparse(ApigeeHost)
httpScheme = url[0]
httpHost = url[1]
body = None
if Directory != None:
# Construct a ZIPped copy of the bundle in memory
tf = StringIO.StringIO()
zipout = zipfile.ZipFile(tf, 'w')
dirList = os.walk(Directory)
for dirEntry in dirList:
if not pathContainsDot(dirEntry[0]):
for fileEntry in dirEntry[2]:
if not fileEntry.endswith('~'):
fn = os.path.join(dirEntry[0], fileEntry)
en = os.path.join(
os.path.relpath(dirEntry[0], Directory),
fileEntry)
print 'Writing %s to %s' % (fn, en)
zipout.write(fn, en)
zipout.close()
body = tf.getvalue()
elif ZipFile != None:
f = open(ZipFile, 'r')
body = f.read()
f.close()
# Upload the bundle to the API
hdrs = 'Content-Type': 'application/octet-stream',
'Accept': 'application/json'
uri = '/v1/organizations/%s/apis?action=import&name=%s' % \
(Organization, Name)
resp = httpCall('POST', uri, hdrs, body)
if resp.status != 200 and resp.status != 201:
print 'Import failed to %s with status %i:\n%s' % \
(uri, resp.status, resp.read())
sys.exit(2)
deployment = json.load(resp)
revision = int(deployment['revision'])
print 'Imported new proxy version %i' % revision
if ShouldDeploy:
# Undeploy duplicates
deps = getDeployments()
for d in deps:
if d['environment'] == Environment and \
d['basePath'] == BasePath and \
d['revision'] != revision:
print 'Undeploying revision %i in same environment and path:' % \
d['revision']
conn = httplib.HTTPSConnection(httpHost)
resp = httpCall('POST',
('/v1/organizations/%s/apis/%s/deployments' +
'?action=undeploy' +
'&env=%s' +
'&revision=%i') % \
(Organization, Name, Environment, d['revision']),
None, None)
if resp.status != 200 and resp.status != 204:
print 'Error %i on undeployment:\n%s' % \
(resp.status, resp.read())
# Deploy the bundle
hdrs = 'Accept': 'application/json'
resp = httpCall('POST',
('/v1/organizations/%s/apis/%s/deployments' +
'?action=deploy' +
'&env=%s' +
'&revision=%i' +
'&basepath=%s') % \
(Organization, Name, Environment, revision, BasePath),
hdrs, None)
if resp.status != 200 and resp.status != 201:
print 'Deploy failed with status %i:\n%s' % (resp.status, resp.read())
sys.exit(2)
deps = getDeployments()
printDeployments(deps)
当我在我的机器上本地运行时,这有效,但在 GCP 上无效。不知道是否与我使用此功能连接到 Google Apigee 的事实有关。奇怪的是 GCP 上的日志没有显示任何错误,但是我没有将代理导出到 Apigee。
感谢帮助!
更新:
尝试使用subprocess.check_output()
,正如这里的一些人所鼓励的那样:
def test(request):
output = None
try:
output = subprocess.check_output([
"./my-proxy/tools/deploy.py",
'-n', 'myProxy',
'-u', 'myUserName:myPassword',
'-o', 'myOrgName',
'-e', 'test',
'-d', './my-proxy',
'-p', '/'])
except:
print(output)
return output
仍然无法在 GCP 上工作。就像我之前提到的,它在我的机器中就像一个魅力(以上两种解决方案),但在 GCP 中却没有。
如下图所示,我在从 GCP 执行 deploy.py
后得到 200,但我的文件没有转到 Apigee:
GCP 日志也没有显示任何错误:
【问题讨论】:
您是否考虑过在 python 函数本身中简单地调用 python 代码,而不是尝试启动子进程? 感谢@DougStevenson 的回复,我对Python 还是很陌生,如果不要求太多,您可以举个例子吗?干杯 我没有例子。我只是从未听说过有人试图在 Cloud Functions 中对另一个 python 进程进行子处理。也许它只是不起作用,并且从未打算起作用。 不用担心@DougStevenson 无论如何感谢您的帮助。会跟上,看看能不能解决这个问题。 设想您的函数被部署到一个容器中,该容器主要包括(在这种情况下)一个 Python 运行时并且不包括一个 shell。 【参考方案1】:这是可能的!
python
可执行文件未在 Cloud Function 运行时中安装或链接,但 python3
已安装或链接。因此,有几种方法可以解决这个问题:
指定python3
作为要运行的程序:"python3 ./my-proxy/tools/deploy.py ..."
;
在deploy.py
脚本中添加#!
运算符:#!/usr/bin/env python3
;
将python解释器指定为Popen
。您可以使用sys.executable
来引用当前使用的可执行文件:
process = subprocess.Popen(
[
sys.executable,
"./deploy.py",
"-n",
"myProxy",
"-u",
"myUserName:myPassword",
"-o",
"myOrgName",
"-e",
"test",
"-d",
"./my-proxy",
"-p",
"/",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
您没有看到错误,因为它是在子进程中生成的,打印到其 stderr,随后由您的程序使用 process.communicate()
或 process.check_output(...)
捕获,但未打印。要查看您遇到的错误,您可以打印出 stdout 和 stderr 的内容:
out, err = process.communicate()
log.debug("returncode = %s", process.returncode)
log.debug("stdout = %s", out)
log.debug("stderr = %s", err)
在github上查看我们用于分析、重现和解决您的问题的源代码
【讨论】:
以上是关于如何在 Python 中运行 Google Cloud Function 中的子进程的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Python 中运行 Google Cloud Function 中的子进程
如何在 Google Colab 笔记本的“.py”文件中运行 Python 脚本?
如何使用GAE收听Google表格,然后运行一组python代码?