无法通过KafkaIO在kafka读取梁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了无法通过KafkaIO在kafka读取梁相关的知识,希望对你有一定的参考价值。

我在Apache Beam中编写了一个非常简单的管道,如下所示:在汇合处从我的kafka集群读取数据如下:

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

但是,当我运行上面的代码来从我的kafka集群中读取数据时,我会遇到异常

我在直接java跑步者上面跑,我使用的是梁2.8,

我可以阅读并向我的kafka融合群集发送消息,但不能通过上面的代码。

答案

如果您遵循堆栈跟踪,则代码会尝试将超时配置属性强制转换为Integerhttps://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

但相反,它得到一个字符串。我的猜测是因为你在这里设置为字符串:propertyBuilder.put("request.timeout.ms","20000")。我认为正确的做法是将其设置为Integer,例如像propertyBuilder.put("request.timeout.ms", 20000)(超时值周围没有引号)。

您也可能遇到与其他配置属性类似的问题(例如,重试退避),您需要仔细检查属性类型。

以上是关于无法通过KafkaIO在kafka读取梁的主要内容,如果未能解决你的问题,请参考以下文章

在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败

Kafka 集群丢失或重复消息

无法在kafka-storm中将偏移数据写入zookeeper

OpenGL、GLSL 片段着色器无法读取 Sampler2D 纹理

Kafka-文件管理

GLSL:无法从 FBO 读取纹理并使用片段着色器渲染到另一个 FBO