如何在 Python 中运行 Google Cloud Function 中的子进程

Posted

技术标签:

【中文标题】如何在 Python 中运行 Google Cloud Function 中的子进程【英文标题】:How to run a subprocess inside Google Cloud Function in Python 【发布时间】:2019-07-23 02:25:13 【问题描述】:

我试图在 GCP 函数中运行一个 bash 脚本,但不知何故它不起作用。 这是我的函数,它基本上将文件(代理)导出到 Google Apigee:

def test2(request):
    cmd = "python ./my-proxy/tools/deploy.py -n myProxy -u userName:!password -o myOrg -e test -d ./my-proxy -p /"
    # no block, it start a sub process.
    p = subprocess.Popen(cmd , shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # and you can block util the cmd execute finish
    p.wait()
    # or stdout, stderr = p.communicate()
    return "Proxy deployed to Apigee"

这是我的deploy.py 文件的样子:

!/usr/bin/env python

import base64
import getopt
import httplib
import json
import re
import os
import sys
import StringIO
import urlparse
import xml.dom.minidom
import zipfile


def httpCall(verb, uri, headers, body):
    if httpScheme == 'https':
        conn = httplib.HTTPSConnection(httpHost)
    else:
        conn = httplib.HTTPConnection(httpHost)

    if headers == None:
        hdrs = dict()
    else:
        hdrs = headers

    hdrs['Authorization'] = 'Basic %s' % base64.b64encode(UserPW)
    conn.request(verb, uri, body, hdrs)

    return conn.getresponse()


def getElementText(n):
    c = n.firstChild
    str = StringIO.StringIO()

    while c != None:
        if c.nodeType == xml.dom.Node.TEXT_NODE:
            str.write(c.data)
        c = c.nextSibling

    return str.getvalue().strip()


def getElementVal(n, name):
    c = n.firstChild

    while c != None:
        if c.nodeName == name:
            return getElementText(c)
        c = c.nextSibling

    return None


# Return TRUE if any component of the file path contains a directory name that
# starts with a "." like '.svn', but not '.' or '..'
def pathContainsDot(p):
    c = re.compile('\.\w+')

    for pc in p.split('/'):
        if c.match(pc) != None:
            return True

    return False


def getDeployments():
    # Print info on deployments
    hdrs = 'Accept': 'application/xml'
    resp = httpCall('GET',
            '/v1/organizations/%s/apis/%s/deployments' \
                % (Organization, Name),
            hdrs, None)

    if resp.status != 200:
        return None

    ret = list()
    deployments = xml.dom.minidom.parse(resp)
    environments = deployments.getElementsByTagName('Environment')

    for env in environments:
        envName = env.getAttribute('name')
        revisions = env.getElementsByTagName('Revision')
        for rev in revisions:
            revNum = int(rev.getAttribute('name'))
            error = None
            state = getElementVal(rev, 'State')
            basePaths = rev.getElementsByTagName('BasePath')

            if len(basePaths) > 0:
                basePath = getElementText(basePaths[0])
            else:
                basePath = 'unknown'

            # svrs = rev.getElementsByTagName('Server')
            status = 'environment': envName,
                    'revision': revNum,
                    'basePath': basePath,
                    'state': state

            if error != None:
                status['error'] = error

            ret.append(status)

    return ret


def printDeployments(dep):
    for d in dep:
        print 'Environment: %s' % d['environment']
        print '  Revision: %i BasePath = %s' % (d['revision'], d['basePath'])
        print '  State: %s' % d['state']
        if 'error' in d:
            print '  Error: %s' % d['error']

ApigeeHost = 'https://api.enterprise.apigee.com'
UserPW = None
Directory = None
Organization = None
Environment = None
Name = None
BasePath = '/'
ShouldDeploy = True

Options = 'h:u:d:e:n:p:o:i:z:'

opts = getopt.getopt(sys.argv[1:], Options)[0]

for o in opts:
    if o[0] == '-n':
        Name = o[1]
    elif o[0] == '-o':
        Organization = o[1]
    elif o[0] == '-h':
        ApigeeHost = o[1]
    elif o[0] == '-d':
        Directory = o[1]
    elif o[0] == '-e':
        Environment = o[1]
    elif o[0] == '-p':
        BasePath = o[1]
    elif o[0] == '-u':
        UserPW = o[1]
    elif o[0] == '-i':
        ShouldDeploy = False
    elif o[0] == '-z':
        ZipFile = o[1]

if UserPW == None or \
        (Directory == None and ZipFile == None) or \
        Environment == None or \
        Name == None or \
        Organization == None:
    print """Usage: deploy -n [name] (-d [directory name] | -z [zipfile])
              -e [environment] -u [username:password] -o [organization]
              [-p [base path] -h [apigee API url] -i]
    base path defaults to "/"
    Apigee URL defaults to "https://api.enterprise.apigee.com"
    -i denotes to import only and not actually deploy
    """
    sys.exit(1)

url = urlparse.urlparse(ApigeeHost)
httpScheme = url[0]
httpHost = url[1]

body = None

if Directory != None:
    # Construct a ZIPped copy of the bundle in memory
    tf = StringIO.StringIO()
    zipout = zipfile.ZipFile(tf, 'w')

    dirList = os.walk(Directory)
    for dirEntry in dirList:
        if not pathContainsDot(dirEntry[0]):
            for fileEntry in dirEntry[2]:
                if not fileEntry.endswith('~'):
                    fn = os.path.join(dirEntry[0], fileEntry)
                    en = os.path.join(
                            os.path.relpath(dirEntry[0], Directory),
                            fileEntry)
                    print 'Writing %s to %s' % (fn, en)
                    zipout.write(fn, en)

    zipout.close()
    body = tf.getvalue()
elif ZipFile != None:
    f = open(ZipFile, 'r')
    body = f.read()
    f.close()

# Upload the bundle to the API
hdrs = 'Content-Type': 'application/octet-stream',
        'Accept': 'application/json'
uri = '/v1/organizations/%s/apis?action=import&name=%s' % \
            (Organization, Name)
resp = httpCall('POST', uri, hdrs, body)

if resp.status != 200 and resp.status != 201:
    print 'Import failed to %s with status %i:\n%s' % \
            (uri, resp.status, resp.read())
    sys.exit(2)

deployment = json.load(resp)
revision = int(deployment['revision'])

print 'Imported new proxy version %i' % revision

if ShouldDeploy:
    # Undeploy duplicates
    deps = getDeployments()
    for d in deps:
        if d['environment'] == Environment and \
            d['basePath'] == BasePath and \
            d['revision'] != revision:
            print 'Undeploying revision %i in same environment and path:' % \
                    d['revision']
            conn = httplib.HTTPSConnection(httpHost)
            resp = httpCall('POST',
                    ('/v1/organizations/%s/apis/%s/deployments' +
                            '?action=undeploy' +
                            '&env=%s' +
                            '&revision=%i') % \
                        (Organization, Name, Environment, d['revision']),
                 None, None)
            if resp.status != 200 and resp.status != 204:
                print 'Error %i on undeployment:\n%s' % \
                        (resp.status, resp.read())

    # Deploy the bundle
    hdrs = 'Accept': 'application/json'
    resp = httpCall('POST',
        ('/v1/organizations/%s/apis/%s/deployments' +
                '?action=deploy' +
                '&env=%s' +
                '&revision=%i' +
                '&basepath=%s') % \
            (Organization, Name, Environment, revision, BasePath),
        hdrs, None)

    if resp.status != 200 and resp.status != 201:
        print 'Deploy failed with status %i:\n%s' % (resp.status, resp.read())
        sys.exit(2)

deps = getDeployments()
printDeployments(deps)

当我在我的机器上本地运行时,这有效,但在 GCP 上无效。不知道是否与我使用此功能连接到 Google Apigee 的事实有关。奇怪的是 GCP 上的日志没有显示任何错误,但是我没有将代理导出到 Apigee。

感谢帮助!

更新: 尝试使用subprocess.check_output(),正如这里的一些人所鼓励的那样:

def test(request):
    output = None
    try:
        output = subprocess.check_output([
        "./my-proxy/tools/deploy.py", 
        '-n',  'myProxy',
        '-u', 'myUserName:myPassword',
        '-o', 'myOrgName',
        '-e', 'test',
        '-d', './my-proxy',
        '-p', '/'])

    except:
        print(output)    

    return output 

仍然无法在 GCP 上工作。就像我之前提到的,它在我的机器中就像一个魅力(以上两种解决方案),但在 GCP 中却没有。 如下图所示,我在从 GCP 执行 deploy.py 后得到 200,但我的文件没有转到 Apigee:

GCP 日志也没有显示任何错误:

【问题讨论】:

您是否考虑过在 python 函数本身中简单地调用 python 代码,而不是尝试启动子进程? 感谢@DougStevenson 的回复,我对Python 还是很陌生,如果不要求太多,您可以举个例子吗?干杯 我没有例子。我只是从未听说过有人试图在 Cloud Functions 中对另一个 python 进程进行子处理。也许它只是不起作用,并且从未打算起作用。 不用担心@DougStevenson 无论如何感谢您的帮助。会跟上,看看能不能解决这个问题。 设想您的函数被部署到一个容器中,该容器主要包括(在这种情况下)一个 Python 运行时并且包括一个 shell。 【参考方案1】:

这是可能的!

python 可执行文件未在 Cloud Function 运行时中安装或链接,但 python3 已安装或链接。因此,有几种方法可以解决这个问题:

    指定python3作为要运行的程序:"python3 ./my-proxy/tools/deploy.py ...";

    deploy.py 脚​​本中添加#! 运算符:#!/usr/bin/env python3

    将python解释器指定为Popen。您可以使用sys.executable 来引用当前使用的可执行文件:

     process = subprocess.Popen(
         [
             sys.executable,
             "./deploy.py",
             "-n",
             "myProxy",
             "-u",
             "myUserName:myPassword",
             "-o",
             "myOrgName",
             "-e",
             "test",
             "-d",
             "./my-proxy",
             "-p",
             "/",
         ],
         stdout=subprocess.PIPE,
         stderr=subprocess.PIPE,
         universal_newlines=True,
     )
    

您没有看到错误,因为它是在子进程中生成的,打印到其 stderr,随后由您的程序使用 process.communicate()process.check_output(...) 捕获,但未打印。要查看您遇到的错误,您可以打印出 stdout 和 stderr 的内容:

    out, err = process.communicate()
    log.debug("returncode = %s", process.returncode)
    log.debug("stdout = %s", out)
    log.debug("stderr = %s", err)

在github上查看我们用于分析、重现和解决您的问题的源代码

【讨论】:

以上是关于如何在 Python 中运行 Google Cloud Function 中的子进程的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 中运行 Google Cloud Function 中的子进程

如何在 Google Colab 笔记本的“.py”文件中运行 Python 脚本?

如何使用GAE收听Google表格,然后运行一组python代码?

如何获得Google Cloud的所有docker-machine映像列表

如何使用 Python 运行 Google gsutil

无法使用 Python 连接到 BigQuery - ServiceUnavailable