通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证
Posted
技术标签:
【中文标题】通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证【英文标题】:Authenticating with Google Cloud from Apache Beam application via code 【发布时间】:2021-10-16 21:32:22 【问题描述】:我正在尝试在Kinesis Data Analytics 中运行一个使用 Apache Flink 作为运行时的 Apache Beam 应用程序。管道使用PubsubIO 连接器。我正在尝试authenticate with Google Cloud using code,因为 Kinesis Data Analytics 不允许导出环境变量,所以导出GOOGLE_APPLICATION_CREDENTIALS 环境变量似乎不是一种选择。
我正在尝试使用以下代码进行身份验证。
GoogleCredentials credential = GoogleCredentials
.fromStream(credentialJsonInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
credential.refreshIfExpired();
options.setGcpCredential(credential);
此处的选项引用继承PubsubOptions。
但是在运行应用程序时它会失败并出现异常:
线程“main”中的异常 org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 禁止发帖 https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish “代码”:403,“错误”:[ “域”:“全球”, "message" : "请求缺少有效的 API 密钥。", “原因”:“禁止”],“消息”:“请求缺少有效的 API 密钥。”,“状态”:“PERMISSION_DENIED” 在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) 在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) 在 org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 在 org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) 在 com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
在调试时我注意到传递给org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient 的PubsubOptions
引用在调用GcpOptions#getGcpCredential
时返回null
我非常感谢有关如何在这种情况下进行身份验证的任何见解。
【问题讨论】:
【参考方案1】:GcpOptions#setGcpCredential 选项不能与 Flink runner 一起使用,因为 Flink runner 序列化 PipelineOptions,但 getGcpCredential 带有 @JsonIgnore 注解。
当没有通过GcpOptions#setGcpCredential 明确设置凭据时, Pub/Sub 等 GCP 服务使用基于当前设置的 GcpOptions#credentialFactoryClass 的凭据。
因此,我们可以定义一个自定义的GcpCredentialFactory
类,而不是调用options.setGcpCredential(credential)
。然后传给GcpOptions#credentialFactoryClass
options.setCredentialFactoryClass(CustomGcpCredentialFactory.class);
您的应用程序的PipelineOptions
接口需要扩展GcpOptions
接口,以便您能够在options
引用上调用上述方法。
public class CustomCredentialFactory extends GcpCredentialFactory
private static CustomCredentialFactory INSTANCE = new CustomCredentialFactory();
private CustomCredentialFactory(PipelineOptions o)
/**
* Required by GcpOptions.GcpUserCredentialsFactory#create(org.apache.beam.sdk.options.PipelineOptions)
*/
public static CustomCredentialFactory fromOptions(PipelineOptions o)
return new CustomCredentialFactory(o);
@Override
public Credentials getCredential()
try
// Load the GCP credential file (from S3, Jar, ..)
InputStream credentialFileInputStream = SomeUtil.getCredentialInputStream();
return GoogleCredentials
.fromStream(credentialFileInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
catch (IOException e)
return null;
【讨论】:
以上是关于通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId
Apache Beam WordCount编程实战及源代码解读