Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]
Posted
技术标签:
【中文标题】Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]【英文标题】:Kafka HDFS Sink Connector error.[Top level type must be STRUCT..] 【发布时间】:2021-09-10 15:26:34 【问题描述】:我正在使用 Kafka connect 测试 2.7 版本的 Kafka, 我面临着我不明白的问题。
我首先使用如下配置启动分布式连接器。
bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
... some internal topic configuration
plugin.path=<plugin path>
此连接器使用 8083 端口提供服务。
我想在 HDFS 上使用 snappy 编解码器编写 ORC 格式数据。 所以我用带有 json 数据的 REST API 制作了新的 Kafka HDFS 连接器,如下所示。 并且我不使用架构注册表。
curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d
"name": "hdfs-sinkconnect-test",
"config":
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"store.url": "hdfs:~",
"hadoop.conf.dir": "<my hadoop.conf dir>",
"hadoop.home": "<hadoop home dir>",
"tasks.max": "5",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
"flush.size": 1000,
"avro.codec": "snappy",
"topics": "<topic name>",
"topics.dir": "/tmp/connect-logs",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale": "ko_KR",
"timezone": "Asia/Seoul",
"partition.duration.ms": "3600000",
"path.format": "'hour'=YYYYMMddHH/"
然后我有这样的错误消息。
# connectDistributed.out
[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>: (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider$1.write(OrcRecordWriterProvider.java:98)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我认为此错误消息与架构信息有关。 Schema Registry 对 Kafka 连接器是否必不可少? 解决此错误消息的任何想法或解决方案?谢谢。
【问题讨论】:
【参考方案1】:编写 ORC 文件需要 Struct 类型。
provided by Confluent 选项包括纯 JSON、JSONSchema、Avro 或 Protobuf。唯一不需要注册表的选项是普通的JsonConverter
请注意,key.deserializer
和 value.deserializer
不是有效的连接属性。您需要参考您的 key.converter
和 value.converter
属性
如果您不愿意修改转换器,您可以尝试使用HoistField
transformer 创建一个 Struct,这将创建一个只有一个字段的架构的 ORC 文件
【讨论】:
非常感谢。我会按照你说的测试一下。以上是关于Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]的主要内容,如果未能解决你的问题,请参考以下文章
Kafka HDFS Sink Connector Protobuf 未写入
Confluent Kafka Connect HDFS Sink 连接器延迟
kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException
即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败