从 Apache Beam(GCP 数据流)写入 ConfluentCloud
Posted
技术标签:
【中文标题】从 Apache Beam(GCP 数据流)写入 ConfluentCloud【英文标题】:Write to ConfluentCloud from Apache Beam (GCP Dataflow) 【发布时间】:2020-05-01 00:47:48 【问题描述】:我正在尝试使用以下命令从 Dataflow (Apache Beam) 写入 Confluent Cloud/Kafka:
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
.withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
.withTopic("testtopic").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
Map<String, Object> props = new HashMap<>();
的位置(即现在为空)
在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'
此集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。
我确实尝试了各种组合,通过上面的props
将我从 Confluent Cloud 获得的 APIKey/Secret 传递给身份验证,但我找不到有效的设置。
【问题讨论】:
“我确实尝试了各种组合来传递 APIKey/Secret” -> 你能更新你的问题以包括这些吗 ***.com/questions/53939658/… 显示了从 Beam 连接到 Confluent Cloud 的示例 - 它作为消费者,因此您需要针对适当的生产者配置进行更改,但属性应该相同 感谢@RobinMoffatt - 我尝试了与另一个答案中链接的参数类似的参数 - 也许我混淆了一些东西。我明天将尝试使用链接的答案,并在此处报告反馈。已经谢谢了! @RobinMoffatt 再次感谢指点,我找到了解决方案,见下文 【参考方案1】:找到了解决方案,感谢问题下方@RobinMoffatt 的 cmets 中的指针
这是我现在的设置:
Map<String, Object> props = new HashMap<>()
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>\";");
props.put("security.protocol", "SASL_SSL");
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
.withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
.withTopic("test").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
我错的关键是sasl.jaas.config
(注意最后的;
!)
【讨论】:
以上是关于从 Apache Beam(GCP 数据流)写入 ConfluentCloud的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam GCP 在动态创建的目录中上传 Avro
如何从 GCP 存储桶中读取 Apache Beam 中的多个文件
在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”
请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam