Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]

Posted

技术标签:

【中文标题】Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]【英文标题】:Kafka HDFS Sink Connector error.[Top level type must be STRUCT..] 【发布时间】:2021-09-10 15:26:34 【问题描述】:

我正在使用 Kafka connect 测试 2.7 版本的 Kafka, 我面临着我不明白的问题。

我首先使用如下配置启动分布式连接器。

bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

... some internal topic configuration

plugin.path=<plugin path>

此连接器使用 8083 端口提供服务。

我想在 HDFS 上使用 snappy 编解码器编写 ORC 格式数据。 所以我用带有 json 数据的 REST API 制作了新的 Kafka HDFS 连接器,如下所示。 并且我不使用架构注册表。

curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d

    "name": "hdfs-sinkconnect-test",
    "config": 
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "store.url": "hdfs:~",
        "hadoop.conf.dir": "<my hadoop.conf dir>",
        "hadoop.home": "<hadoop home dir>",
        "tasks.max": "5",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
        "format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
        "flush.size": 1000,
        "avro.codec": "snappy",
        "topics": "<topic name>",
        "topics.dir": "/tmp/connect-logs",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "locale": "ko_KR",
        "timezone": "Asia/Seoul",
        "partition.duration.ms": "3600000",
        "path.format": "'hour'=YYYYMMddHH/"
    

然后我有这样的错误消息。

# connectDistributed.out

[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>:  (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
        at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider$1.write(OrcRecordWriterProvider.java:98)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我认为此错误消息与架构信息有关。 Schema Registry 对 Kafka 连接器是否必不可少? 解决此错误消息的任何想法或解决方案?谢谢。

【问题讨论】:

【参考方案1】:

编写 ORC 文件需要 Struct 类型。

provided by Confluent 选项包括纯 JSON、JSONSchema、Avro 或 Protobuf。唯一不需要注册表的选项是普通的JsonConverter

请注意,key.deserializervalue.deserializer 不是有效的连接属性。您需要参考您的 key.convertervalue.converter 属性

如果您不愿意修改转换器,您可以尝试使用HoistField transformer 创建一个 Struct,这将创建一个只有一个字段的架构的 ORC 文件

【讨论】:

非常感谢。我会按照你说的测试一下。

以上是关于Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]的主要内容,如果未能解决你的问题,请参考以下文章

Kafka HDFS Sink Connector Protobuf 未写入

Confluent Kafka Connect HDFS Sink 连接器延迟

kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

Docker Confluent Kafka HDFS Sink 正在运行但任务失败

kafka 连接器 jdbc-sink 最后出现语法错误