Kafka HDFS Sink Connector Protobuf 未写入
Posted
技术标签:
【中文标题】Kafka HDFS Sink Connector Protobuf 未写入【英文标题】:Kafka HDFS Sink Connector Protobuf not being written 【发布时间】:2021-10-01 00:31:58 【问题描述】:我正在尝试使用 Kafka HDFS 3 sink 连接器将 protobuf 二进制文件写入 HDFS。但是,连接器一直在写入 avro 文件。
我已经使用以下配置设置了我的接收器连接器
"name": "hdfs3-connector-test",
"config":
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "testproto",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
如您所见,我正在使用 ProtobufConverter 作为值转换器,并且安装了插件。 (ProtobufConverter 是否转换为 Avro 文件格式?)。
我还使用以下 Java 文件注册了我的架构并将数据发送到主题:
package app;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import test.Test.*;
public class App
public static void main( String[] args )
try
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.8.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.class.getName());
props.put("schema.registry.url", "http://10.8.0.1:8081");
KafkaProducer<String, MyMsg> producer = new KafkaProducer<String, MyMsg>(props);
String topic = "testproto";
String key = "testkey";
MyMsg m = MyMsg.newBuilder().setF1("testing").build();
ProducerRecord<String, MyMsg> record = new ProducerRecord<String, MyMsg>(topic, key, m);
producer.send(record).get();
producer.close();
catch (Exception e)
System.out.println(e.toString());
这是我的原型文件
syntax = "proto3";
package test;
message MyMsg
string f1 = 1;
所以我的问题是,这是正确的吗?我只能使用此连接器将 Avro 文件写入 HDFS 吗?还是我的配置不正确,我应该期待 HDFS 中的 protobuf 文件?
【问题讨论】:
【参考方案1】:你需要设置format.class
配置,
format.class 是 将数据写入存储时使用的格式类。格式类实现 io.confluent.connect.storage.format.Format 接口。
类型:类 默认值:io.confluent.connect.hdfs3.avro.AvroFormat 重要性:高 这些类默认可用:
io.confluent.connect.hdfs3.avro.AvroFormat io.confluent.connect.hdfs3.json.JsonFormat io.confluent.connect.hdfs3.parquet.ParquetFormat io.confluent.connect.hdfs3.string.StringFormat
https://docs.confluent.io/kafka-connect-hdfs3-sink/current/configuration_options.html#hdfs3-config-options
【讨论】:
好收获。好像暂时没有对format.class的protobuf支持。 它可以实现我猜因为代码在github中提供 如果我的回答对您有帮助,您应该接受或至少 +1,谢谢 Github 中没有 HDFS3 连接器代码。 github.com/confluentinc/kafka-connect-hdfs/tree/master/src/main/…以上是关于Kafka HDFS Sink Connector Protobuf 未写入的主要内容,如果未能解决你的问题,请参考以下文章
无法将 Kafka 与 InfluxDB Sink Connector 连接
Kafka JDBC Sink Connector,批量插入值
Kafka Confluent HTTP Sink Connector 的开源替代方案 [关闭]
Confluent Kafka Sink Connector 未将数据加载到 Postgres 表