使用 kafka 连接的 hdfs 中没有 avro 数据
Posted
技术标签:
【中文标题】使用 kafka 连接的 hdfs 中没有 avro 数据【英文标题】:There's no avro data in hdfs using kafka connect 【发布时间】:2018-03-03 18:58:29 【问题描述】:我正在使用 kafka 连接分发。 命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
worker配置为:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 group.id=连接集群 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=falsekafka 连接重新开始,没有错误!
主题connect-configs,connect-offsets,connect-statuses 已创建。 主题 mysiteview 已创建。
然后我使用这样的 RESTful API 创建 kafka 连接器:
curl -X POST -H "Content-Type: application/json" --data '"name":"hdfs-sink-mysiteview","config":"connector.class":"io.confluent.connect. hdfs.HdfsSinkConnector","tasks.max":"3","topics":"mysiteview","hdfs.url":"hdfs://master1:8020","topics.dir":"/kafka/topics ","logs.dir":"/kafka/logs","format.class":"io.confluent.connect.hdfs.avro.AvroFormat","flush.size":"1000","rotate.interval. ms":"1000","partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner","path.format":"YYYY-MM-dd","schema.compatibility":"BACKWARD" ,"locale":"zh_CN","timezone":"Asia/Shanghai"' http://kafka1:8083/connectors当我为“mysiteview”主题生成数据时,如下所示:
"f1":"192.168.1.1","f2":"aa.example.com"java代码如下:
Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++)
long runtime = new Date().getTime();
String site = "www.example.com";
String ipString = "192.168.2." + rnd.nextInt(255);
String key = "" + rnd.nextInt(255);
User u = new User();
u.setF1(ipString);
u.setF2(site+" "+rnd.nextInt(255));
System.out.println(JSON.toJSONString(u));
producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
Thread.sleep(50);
producer.flush();
producer.close();
奇怪的事情发生了。 我从 kafka-logs 获取数据,但在 hdfs 中没有数据(没有主题目录)。 我尝试连接器命令:
curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/status输出是:
"name":"hdfs-sink-mysiteview","connector":"state":"RUNNING","worker_id":"10.255.223.178:8083","tasks":["state":" RUNNING","id":0,"worker_id":"10.255.223.178:8083","state":"RUNNING","id":1,"worker_id":"10.255.223.178:8083", "state":"RUNNING","id":2,"worker_id":"10.255.223.178:8083"]但是当我使用以下命令检查任务状态时:
curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/hdfs-sink-siteview-1我得到结果:“错误 404”。三个任务是一样的错误!
怎么了?
【问题讨论】:
【参考方案1】:在没有看到工作人员日志的情况下,我不确定当您使用上述设置时,您的 HDFS 连接器实例究竟是哪个异常失败了。但是我可以发现一些配置问题:
-
您提到您使用以下方式启动 Connect 工作程序:
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
。这些属性默认将键和值转换器设置为AvroConverter
,并要求您运行schema-registry
服务。如果您确实在connect-avro-distributed.properties
中编辑了配置以改用JsonConverter
,那么您的HDFS 连接器在将Kafka 记录转换为Connect 的SinkRecord
数据类型期间可能会失败,就在它尝试将您的数据导出到HDFS 之前.
直到最近,HDFS 连接器才能将 Avro 记录导出为 Avro 或 Parquet 格式的文件。这需要使用上面提到的AvroConverter
。最近添加了将记录导出为 JSON 格式的文本文件的功能,并将出现在连接器的 4.0.0
版本中(您可以通过签出并从源代码构建连接器来尝试此功能)。
此时,我的第一个建议是尝试使用bin/kafka-avro-console-producer
导入您的数据。定义他们的架构,使用bin/kafka-avro-console-consumer
确认数据已成功导入,然后如上所述将您的HDFS 连接器设置为使用AvroFormat
。连接器页面上的quickstart 描述了一个非常相似的过程,也许这将是您的用例的一个很好的起点。
【讨论】:
【参考方案2】:也许您只是使用了错误的 REST-Api。
根据文档,电话应该是
/connectors/:connector_name/tasks/:task_id
https://docs.confluent.io/3.3.1/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status
【讨论】:
以上是关于使用 kafka 连接的 hdfs 中没有 avro 数据的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
使用 Kafka HDFS Connect 写入 HDFS 时出错
Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]
kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException