通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证

Posted

技术标签:

【中文标题】通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证【英文标题】:Authenticating with Google Cloud from Apache Beam application via code 【发布时间】:2021-10-16 21:32:22 【问题描述】:

我正在尝试在Kinesis Data Analytics 中运行一个使用 Apache Flink 作为运行时的 Apache Beam 应用程序。管道使用PubsubIO 连接器。我正在尝试authenticate with Google Cloud using code,因为 Kinesis Data Analytics 不允许导出环境变量,所以导出GOOGLE_APPLICATION_CREDENTIALS 环境变量似乎不是一种选择。

我正在尝试使用以下代码进行身份验证。

    GoogleCredentials credential = GoogleCredentials
            .fromStream(credentialJsonInputStream)
            .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
    credential.refreshIfExpired();

    options.setGcpCredential(credential);

此处的选项引用继承PubsubOptions。

但是在运行应用程序时它会失败并出现异常:

线程“main”中的异常 org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 禁止发帖 https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish “代码”:403,“错误”:[ “域”:“全球”, "message" : "请求缺少有效的 API 密钥。", “原因”:“禁止”],“消息”:“请求缺少有效的 API 密钥。”,“状态”:“PERMISSION_DENIED” 在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) 在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) 在 org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 在 org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) 在 com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)

在调试时我注意到传递给org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient 的PubsubOptions 引用在调用GcpOptions#getGcpCredential 时返回null

我非常感谢有关如何在这种情况下进行身份验证的任何见解。

【问题讨论】:

【参考方案1】:

GcpOptions#setGcpCredential 选项不能与 Flink runner 一起使用,因为 Flink runner 序列化 PipelineOptions,但 getGcpCredential 带有 @JsonIgnore 注解。

当没有通过GcpOptions#setGcpCredential 明确设置凭据时, Pub/Sub 等 GCP 服务使用基于当前设置的 GcpOptions#credentialFactoryClass 的凭据。

因此,我们可以定义一个自定义的GcpCredentialFactory 类,而不是调用options.setGcpCredential(credential)。然后传给GcpOptions#credentialFactoryClass

options.setCredentialFactoryClass(CustomGcpCredentialFactory.class);

您的应用程序的PipelineOptions 接口需要扩展GcpOptions 接口,以便您能够在options 引用上调用上述方法。

public class CustomCredentialFactory extends GcpCredentialFactory 

  private static CustomCredentialFactory INSTANCE = new CustomCredentialFactory();

  private CustomCredentialFactory(PipelineOptions o)   

  /**
   * Required by GcpOptions.GcpUserCredentialsFactory#create(org.apache.beam.sdk.options.PipelineOptions)
   */
  public static CustomCredentialFactory fromOptions(PipelineOptions o) 

      return new CustomCredentialFactory(o);
   

  @Override
  public Credentials getCredential() 

      try 

          // Load the GCP credential file (from S3, Jar, ..)
          InputStream credentialFileInputStream = SomeUtil.getCredentialInputStream();

          return GoogleCredentials
                  .fromStream(credentialFileInputStream)
                  .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");

       catch (IOException e) 
          return null;
      
  


【讨论】:

以上是关于通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId

Apache Beam WordCount编程实战及源代码解读

Python Apache Beam 侧输入断言错误

什么是 Apache Beam? [关闭]

Python 上的 Apache Beam 将 beam.Map 调用相乘

尝试从 Apache Beam 中的选项获取值时出现空指针异常