从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Posted

技术标签:

【中文标题】从 Apache Beam(GCP 数据流)写入 ConfluentCloud【英文标题】:Write to ConfluentCloud from Apache Beam (GCP Dataflow) 【发布时间】:2020-05-01 00:47:48 【问题描述】:

我正在尝试使用以下命令从 Dataflow (Apache Beam) 写入 Confluent Cloud/Kafka:

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
                                .withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
                                .withTopic("testtopic").withKeySerializer(StringSerializer.class)
                                .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

Map&lt;String, Object&gt; props = new HashMap&lt;&gt;(); 的位置(即现在为空)

在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'

此集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。

我确实尝试了各种组合,通过上面的props 将我从 Confluent Cloud 获得的 APIKey/Secret 传递给身份验证,但我找不到有效的设置。

【问题讨论】:

“我确实尝试了各种组合来传递 APIKey/Secret” -> 你能更新你的问题以包括这些吗 ***.com/questions/53939658/… 显示了从 Beam 连接到 Confluent Cloud 的示例 - 它作为消费者,因此您需要针对适当的生产者配置进行更改,但属性应该相同 感谢@RobinMoffatt - 我尝试了与另一个答案中链接的参数类似的参数 - 也许我混淆了一些东西。我明天将尝试使用链接的答案,并在此处报告反馈。已经谢谢了! @RobinMoffatt 再次感谢指点,我找到了解决方案,见下文 【参考方案1】:

找到了解决方案,感谢问题下方@RobinMoffatt 的 cmets 中的指针

这是我现在的设置:

Map<String, Object> props = new HashMap<>()

props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>\";");
props.put("security.protocol", "SASL_SSL");

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
    .withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
    .withTopic("test").withKeySerializer(StringSerializer.class)
    .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

我错的关键是sasl.jaas.config(注意最后的;!)

【讨论】:

以上是关于从 Apache Beam(GCP 数据流)写入 ConfluentCloud的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam GCP 在动态创建的目录中上传 Avro

如何从 GCP 存储桶中读取 Apache Beam 中的多个文件

在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”

请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam

Spring Cloud Dataflow 与 Apache Beam/GCP 数据流说明

GCP Dataflow + Apache Beam - 缓存问题