使用谷歌传输服务将文件从 AWS 传输到 GCP 时出现凭证错误
Posted
技术标签:
【中文标题】使用谷歌传输服务将文件从 AWS 传输到 GCP 时出现凭证错误【英文标题】:Credentials error when using google transfer service to transfer file from AWS to GCP 【发布时间】:2018-04-20 23:34:40 【问题描述】:我们开发了一个自动化管道,主要在 AWS 上执行任务,然后在 Google Cloud 上执行一些下游工作。这些任务通过 AWS StepFunctions/Lambda 部署在 AWS 上,我们需要将处理过的文件从 AWS 传递到 Google Cloud Storage(通过 Google Transfer Service)。但是,我在将 AWS 和 GCP 部分连接在一起时遇到了麻烦。
我有一个用于实现谷歌传输服务的 Lambda 函数(通过谷歌的 python 客户端库),但是我不断收到错误:
module initialization error: Could not automatically determine
credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or
explicitly create credential and re-run the application. For more
information, please see
https://developers.google.com/accounts/docs/application-default-
credentials.
我的 S3-to-GCS Lambda 函数(我称之为“handoff”)中有一个设置 GOOGLE_APPLICATION_CREDENTIALS 环境变量的方法,但它显然不起作用。
这里是 Handoff Lambda 函数:
"""
Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
"""
import requests
import boto3
from botocore.client import ClientError
from requests.exceptions import RequestException
from google.cloud import storage
import googleapiclient.discovery
import logging
import uuid
import os
import base64
EVENT_GCP_CREDS_KEY = 'gcp_creds_b64_cypher_text'
GCP_CREDENTIALS_FILE_NAME = 'service_creds.json'
# Establish clients
kms = boto3.client('kms')
storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')
def handler(event, context):
description = "-".join("transfer-job", event['queue'])
project_id = event['project_id']
source_bucket = event['results_uri'] + "final-cohort-vcf/"
sink_bucket = event['sink_bucket']
include_prefix = event['cohort_prefix'] + ".gt.vcf.gz"
access_key = event['aws_access_key']
secret_access_key = event['aws_secret_key']
return gcp_storage_read_op_verification(event)
now = datetime.datetime.now()
day = now.day
month = now.month
year = now.year
# Add 7 hours because the time has to be in UTC
hours = now.hour + 7
minutes = now.minute + 2
transfer_job =
'description': description,
'status': 'ENABLED',
'projectId': project_id,
'schedule':
'scheduleStartDate':
'day': day,
'month': month,
'year': year
,
'scheduleEndDate':
'day': day,
'month': month,
'year': year
,
'startTimeOfDay':
'hours': hours,
'minutes': minutes
,
'transferSpec':
'objectConditions':
'includePrefixes': [
include_prefix
]
,
'awsS3DataSource':
'bucketName': source_bucket,
'awsAccessKey':
'accessKeyId': access_key,
'secretAccessKey': secret_access_key
,
'gcsDataSink':
'bucketName': sink_bucket
result = storagetransfer.transferJobs().create(body=transfer_job).execute()
print('Returned transferJob: '.format(
json.dumps(result, indent=4)))
return result
def _gcp_credentials_hack(event):
"""
A hack to enable GCP client usage via Application Default Credentials. Uses an encoded, encrypted string of the
GCP service account JSON from the event object. Has the side effecst of
1. Writing the credentials in plaintext to /tmp/service_creds.json
2. Referencing this location in environment variable
:param event: Passed in by the container, expecting to find key: gcp_creds_b64_cypher_text
:return: None
"""
# Get blob from event
cypher_text_blob = event['GCP_creds']
# Decode cypher_text from base64, into bytes
cypher_text_bytes = base64.b64decode(cypher_text_blob)
# Call KMS to decrypt GCP credentials cypher text
# Permisssions for this action should be given by a policy attached this function's invocation role
decrypt_response = kms.decrypt(CiphertextBlob=cypher_text_bytes)
# Process the plaintext from the result object from kms.decrypt(..) into a utf-8 encoded bytes object
gcp_credentials_bytes = decrypt_response['Plaintext']
# Decode the utf8-encoded bytes object into a Python str of th GCP credentials
gcp_credentials = gcp_credentials_bytes.decode('utf-8')
# Write the credentials in plaintext to /tmp
# - Can we gaurantee that only approved agents can read this file?
# YES. See comment.
gcp_credentials_file_local = os.path.sep.join(('/tmp', GCP_CREDENTIALS_FILE_NAME))
logger.debug('writing credentials to '.format(gcp_credentials_file_local))
with open(gcp_credentials_file_local, 'w') as credentials_fh:
credentials_fh.write(gcp_credentials)
# Set Application Default Credentials environment path to tmp file we created
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = gcp_credentials_file_local
def _get_blob_from_gcs_public_landsat_bucket():
"""
Create Google Cloud Storage client and use it to return a blob from the public landsat image bucket.
:return: blob retrieved from public landsat bucket using Google Cloud Storage client library
"""
# Create GCP storage client instance, using Application Default Credentials (hack)
client = storage.Client()
# Use GCP storage client to get public landsat bucket object
public_landsat_bucket = client.get_bucket(bucket_name='gcp-public-data-landsat')
# return the first blob in the bucket
for blob in public_landsat_bucket.list_blobs():
return blob
def _get_blob_from_hail_project_bucket():
"""
Create Google Cloud Storage client and use it to return a blob from the public landsat image bucket.
:return: blob retrieved from public landsat bucket using Google Cloud Storage client library
"""
# Create GCP storage client instance, using Application Default Credentials (hack)
client = storage.Client()
# Use GCP storage client to get public landsat bucket object
public_landsat_bucket = client.get_bucket(bucket_name='liftover-results')
# return the first blob in the bucket
for blob in public_landsat_bucket.list_blobs():
return blob
def gcp_storage_read_op_verification(event):
"""
:param event: event from handler that contains the encrypted GCP service credentials
:return: first blob id from GCS public landsat bucket
"""
try:
_gcp_credentials_hack(event=event)
# If credentials hack succeeded, this will return a blob from the GCS plublic landsat public using GCS
# python client
landsat_blob = _get_blob_from_gcs_public_landsat_bucket()
liftover_blob = _get_blob_from_hail_project_bucket()
# return dict of blob.id for response jsonification
return
"gcp-public-data-landsat-random-blob-id": landsat_blob.id,
"gcp-private-hail-bucket-random-blob-id": liftover_blob.id,
except BaseException as e:
# Very broad exception class.. using it for time being, until have opportunity to find the fine-grained
# exceptions that can be raised from the statement suite
logger.error(e)
return FAIL_TOKEN
Appears to be safe. See https://forums.aws.amazon.com/message.jspa?messageID=761306
"Thanks for reaching out to us. I believe what you are referring to is Lambda container re-usage. All the
resources associated with a Lambda container, including the /tmp storage, are isolated from other Lambda
containers.
https://aws.amazon.com/blogs/compute/container-reuse-in-lambda/
Indeed there is the possibility of reusing the same Lambda container if your function is executed at close
time intervals. Important to note is that the container will only be reused for your particular Lambda
function, other functions from your account or different accounts will run in other isolated containers.
So there is a 1-to-1 association between a Lambda function and its container.
After a certain time interval of a Lambda function not being invoked, its associated container is deleted
and in the process all the data stored in the memory or disk associated with the container is destroyed as
well."
...
2. With the client library bootstrapped, use the storage client to retrieve the public landsat image
bucket, and return a blob.
3. Finally return that blob's id, as proof of the execution.
:param event: event from handler that contains the encrypted GCP service credentials
:return: first blob id from GCS public landsat bucket
"""
# Use encrypted GCP storage crendentials from event to enable file-based Application Default Credentials.
# This is somewhat of a bad hack since they must exist in plaintext on a filesystem. Although, see above, this
# is supposed to be private between containers.
#
# TODO: If using this hack often, would be better to delete the plaintext creds file after the GCP API call
# Possibly using a context manager. E.g.
#
# with gcp_creds_hack:
# _get_blob
#
# Implemented like
#
# from contextlib import contextmanager
#
# @contextmanager
# def gcp_creds_hack(file):
# # write the plaintext creds file, file, to /tmp
# yield
# # delete the plaintext creds file, file, from /tmp
#
try:
_gcp_credentials_hack(event=event)
# If credentials hack succeeded, this will return a blob from the GCS plublic landsat public using GCS
# python client
landsat_blob = _get_blob_from_gcs_public_landsat_bucket()
liftover_blob = _get_blob_from_hail_project_bucket()
# return dict of blob.id for response jsonification
return
"gcp-public-data-landsat-random-blob-id": landsat_blob.id,
"gcp-private-hail-bucket-random-blob-id": liftover_blob.id,
except BaseException as e:
# Very broad exception class.. using it for time being, until have opportunity to find the fine-grained
# exceptions that can be raised from the statement suite
logger.error(e)
return FAIL_TOKEN
在事件对象中传递的 GCP 凭据源于管道用户输入到服务帐户 json 文件的本地文件路径。然后我使用一个函数来读取这个 json 文件并通过以下脚本对其进行加密:
import boto3
import base64
class LambdaModeUseCases(object):
@staticmethod
def encrypt_gcp_creds(gcp_creds_file, key_id):
# get GCP creds as string
with open(gcp_creds_file, 'r') as gcp_creds_fh:
gcp_creds = gcp_creds_fh.read().replace('\n', '')
# kms:Encrypt (need key_id or arn)
kms_client = boto3.client('kms')
encryption_response =
try:
encryption_response = kms_client.encrypt(
KeyId=key_id,
Plaintext=gcp_creds.encode('utf-8'),
)
except ClientError as ce:
print('Failed calling kms.encrypt with key key on text text'.format(key=key_id, text=gcp_creds))
raise ce
print('Parsing kms_client.encrypt(..) response')
cypher_text_blob = 'FAILED'
if 'CiphertextBlob' in encryption_response:
cypher_text_blob = encryption_response['CiphertextBlob']
else:
print('Wait able to call kms.encrypt(..) without a botocore.client.ClientError.')
print('But failed to find CiphertextBlob in response: response'.format(
response=encryption_response
))
raise Exception('Failed to find CiphertextBlob in kms.encrypt() response.')
print('Base64 encoding...')
encrypted_gcp_creds_b64 = base64.b64encode(cypher_text_blob)
print('b64: '.format(encrypted_gcp_creds_b64))
encrypted_gcp_creds_str = encrypted_gcp_creds_b64.decode("utf-8")
print('str: '.format(encrypted_gcp_creds_str))
return encrypted_gcp_creds_str
这个加密的凭证对象随后被传递到 Handoff Lambda 函数中。
关于出了什么问题或者更简化的方法有什么想法吗?
【问题讨论】:
【参考方案1】:试试这个:
将您的 Google 服务帐户凭据保存在与包含您的 lambda 代码的 Python 脚本相同的目录中,您的脚本应调用以下代码:
os.setenv['GOOGLE_APPLICATION_CREDENTIALS'] = './yourcred.json'
压缩脚本和 JSON 文件(创建部署包)。
在 AWS Lambda 服务中,创建一个 Lambda 函数,并上传 ZIP 文件而不是使用蓝图有这样做的例子,例如https://gist.github.com/cyk/8ec6481d3dcbe10376f8。
【讨论】:
【参考方案2】:如果您在 json 文件中有凭据,则无需设置环境变量,而是手动提供服务帐户凭据(不是默认凭据)。您可以运行以下代码:
from googleapiclient import discovery
from google.oauth2 import service_account
SERVICE_ACCOUNT_FILE = 'service.json'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE)
storagetransfer = discovery.build('storagetransfer', 'v1', credentials=credentials)
【讨论】:
感谢您的建议!我一直在尝试,但一直收到错误消息:"Failed to obtain the location of the Google Cloud Storage (GCS) bucket pipeline-validation due to insufficient permissions. Please verify that the necessary permissions have been granted.">
服务帐户肯定可以访问存储传输资源,因为我是服务帐户的用户并且是“所有者”。有什么想法吗?
您能否验证this 的权限条件是否完全满足?以上是关于使用谷歌传输服务将文件从 AWS 传输到 GCP 时出现凭证错误的主要内容,如果未能解决你的问题,请参考以下文章
使用 gsutil 将数据从 aws s3 传输到 gcs - SSL 认证错误
将 Kubernetes 应用程序从 AWS 迁移到 GCP
在没有 AWS Pipeline 的情况下,将数据(csv 文件)从 s3 自动传输到 Redshift 的最佳方式是啥?
通过 file.createWriteStream() 流式传输到 GCP 时,获取 ESOCKETTIMEOUT、ECONNRESET 或套接字挂起大文件