无法通过 KafkaIO 在光束中读取卡夫卡

Posted

技术标签:

【中文标题】无法通过 KafkaIO 在光束中读取卡夫卡【英文标题】:Can't read from kafka by KafkaIO in beam 【发布时间】:2019-05-25 03:57:44 【问题描述】:

我在 Apchea Beam 中编写了一个非常简单的管道,如下所示从我在 Confluent Cloud 上的 kafka 集群中读取数据,如下所示:

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

但是,当运行上面的代码从我的 kafka 集群中读取数据时,我遇到了异常

我在直接 java runner 上运行,我使用的是 beam 2.8,

我可以读取并生成消息到我的 kafka 融合集群,但不能通过上述代码。

【问题讨论】:

【参考方案1】:

如果您跟踪堆栈跟踪,代码似乎会尝试将超时配置属性强制转换为Integer:https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

但是它得到了一个字符串。我的猜测是,这是因为您在此处将其设置为字符串:propertyBuilder.put("request.timeout.ms","20000")。我认为正确的做法是将其设置为Integer,例如比如propertyBuilder.put("request.timeout.ms", 20000)(超时值周围没有引号)。

您也可能对其他配置属性有类似的问题(例如重试退避),您需要仔细检查属性类型。

【讨论】:

以上是关于无法通过 KafkaIO 在光束中读取卡夫卡的主要内容,如果未能解决你的问题,请参考以下文章

消费者。如何指定要读取的分区? [卡夫卡]

卡夫卡KAFKA

卡夫卡连接 |由于操作冲突,无法完成请求

IBM AIX - 阿帕奇卡夫卡

无法通过KafkaIO在kafka读取梁

如何暂停卡夫卡消费者?