kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException

Posted

技术标签:

【中文标题】kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException【英文标题】:kafka connect - ExtractTopic transformation with hdfs sink connector throws NullPointerException 【发布时间】:2019-06-19 10:34:33 【问题描述】:

我正在使用融合 hdfs sink 连接器 5.0.0 和 kafka 2.0.0,我需要使用 ExtractTopic 转换 (https://docs.confluent.io/current/connect/transforms/extracttopic.html)。我的连接器工作正常,但是当我添加此转换时,我得到 NullPointerException,即使在只有 2 个属性的简单数据样本上也是如此。

ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) 

这里是连接器的配置:

name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=$env.SCHEMA_REGISTRY_URL
value.converter.schema.registry.url=$env.SCHEMA_REGISTRY_URL
schema.compatibility=BACKWARD

# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=$env.HDFS_URL
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=$env.HDFS_TOPICS_DIR

# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000

# Hive integration
hive.integration=true
hive.metastore.uris=$env.HIVE_METASTORE_URIS
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect

# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset

transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true

我正在使用模式注册表,数据是 avro 格式,我确信给定的属性 name 不为空。有什么建议?我需要的基本上是提取给定字段的内容并将其用作主题名称。

编辑:

它甚至发生在像这样的 avro 格式的简单 json 上:


   "attr": "tmp",
   "name": "topic1"

【问题讨论】:

有助于查看您发送和转换的实际数据 好的,我添加了示例 - 即使在具有 2 个字段的简单 json 上也会发生这种情况(见上文)。 连接器配置的输入主题名称是什么?你能包括整个连接器配置吗? 是的,我添加了整个配置。输入主题名称为hive_table_test 【参考方案1】:

简短的回答是因为,您更改了转换中的主题名称。

每个主题分区的 Hdfs 连接器都有单独的TopicPartitionWriter。当SinkTask,即负责处理消息的时候,在open(...)方法中为每个分区创建TopicPartitionWriter

当它处理 SinkRecords 时,它会根据 topic 名称和 partition 编号查找 TopicPartitionWriter 并尝试将记录附加到其缓冲区。在您的情况下,它找不到任何写入消息。主题名称已被转换更改,并且对于该对(主题、分区)没有创建任何 TopicPartitionWriter

传递给HdfsSinkTask::put(Collection<SinkRecord> records) 的 SinkRecords 已经设置了分区和主题,因此您不必应用任何转换。

我认为io.confluent.connect.transforms.ExtractTopic 应该用于SourceConnector

【讨论】:

以上是关于kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

使用kafka connect,将数据批量写到hdfs完整过程

Kafka 到 Elasticsearch、带有 Logstash 的 HDFS 或 Kafka Streams/Connect

有哪些开源解决方案可以使用 Kafka Connect 将数据从 Kafka 移动到 HDFS3?

需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话