使用 Kafka HDFS Connect 写入 HDFS 时出错
Posted
技术标签:
【中文标题】使用 Kafka HDFS Connect 写入 HDFS 时出错【英文标题】:Error while writing to HDFS using Kafka HDFS Connect 【发布时间】:2016-12-02 06:52:22 【问题描述】:我正在尝试使用 kafka HDFS 连接器以 avro 格式将数据从我的 Java 代码写入 Kafka 到 HDFS,但我遇到了一些问题。当我使用 confluent 平台网站上提供的简单架构和数据时,我能够将数据写入 HDFS,但是当我尝试使用复杂的 avro 架构时,我在 HDFS 连接器日志中收到此错误:
ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我使用的是融合平台 3.0.0
我的 Java 代码:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", <url>);
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);
Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc"));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
InputStream input = new FileInputStream("json/data.json");
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
Object datum = null;
while (true)
try
datum = reader.read(null, decoder);
catch (EOFException e)
break;
ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum);
producer.send(message);
producer.close();
架构(这是从 avdl 文件创建的):
"type" : "record",
"name" : "RiskMeasureEvent",
"namespace" : "risk",
"fields" : [
"name" : "info",
"type" :
"type" : "record",
"name" : "RiskMeasureInfo",
"fields" : [
"name" : "source",
"type" :
"type" : "record",
"name" : "Source",
"fields" : [
"name" : "app",
"type" :
"type" : "record",
"name" : "Application",
"fields" : [
"name" : "csi_id",
"type" : "string"
,
"name" : "name",
"type" : "string"
]
,
"name" : "env",
"type" :
"type" : "record",
"name" : "Environment",
"fields" : [
"name" : "value",
"type" : [
"type" : "enum",
"name" : "EnvironmentConstants",
"symbols" : [ "DEV", "UAT", "PROD" ]
, "string" ]
]
, ...
json文件:
"info":
"source":
"app":
"csi_id": "123",
"name": "ABC"
,
"env":
"value":
"risk.EnvironmentConstants": "PROD"
, ...
这似乎是架构的问题,但我无法确定问题。
【问题讨论】:
【参考方案1】:我是 Confluent 的工程师。这是 Avro 转换器如何处理 env 的联合模式的一个错误。我创建了issue-393 来解决这个问题。我还将pull request 与修复程序放在一起。这应该很快合并。
J
【讨论】:
您好 Jeremy,感谢您的修复。我已经从您的分支下载了最新的架构注册表代码。由于它尚未包含在 confluent 包中,因此我下载了 apache kafka 和 kafka-hdfs-connect 的代码并在本地构建它们。尝试运行 hdfs 连接器时,尝试加载 AvroConverter 文件(位于模式注册表中)时出错。我可以知道如何配置连接器以便它能够找到那个 jar 吗?以上是关于使用 Kafka HDFS Connect 写入 HDFS 时出错的主要内容,如果未能解决你的问题,请参考以下文章
Confluent Kafka Connect HDFS Sink 连接器延迟
kafka-connect-hdfs重启,进去RECOVERY状态,从hadoop hdfs拿租约,很正常,但是也太久了吧
使用JsonConverter的Kafka Connect HDFS Sink for JSON格式
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?