Kafka HDFS Sink Connector Protobuf 未写入

Posted

技术标签:

【中文标题】Kafka HDFS Sink Connector Protobuf 未写入【英文标题】:Kafka HDFS Sink Connector Protobuf not being written 【发布时间】:2021-10-01 00:31:58 【问题描述】:

我正在尝试使用 Kafka HDFS 3 sink 连接器将 protobuf 二进制文件写入 HDFS。但是,连接器一直在写入 avro 文件。

我已经使用以下配置设置了我的接收器连接器


    "name": "hdfs3-connector-test",
    "config": 
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "testproto",
        "hdfs.url": "hdfs://10.8.0.1:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://10.8.0.1:8081",
        "confluent.topic.bootstrap.servers": "10.8.0.1:9092",
        "confluent.topic.replication.factor": "1",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true"
    

如您所见,我正在使用 ProtobufConverter 作为值转换器,并且安装了插件。 (ProtobufConverter 是否转换为 Avro 文件格式?)。

我还使用以下 Java 文件注册了我的架构并将数据发送到主题:

package app;

import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import test.Test.*;

public class App 

    public static void main( String[] args )
    
        try  
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.8.0.1:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.class.getName());
            props.put("schema.registry.url", "http://10.8.0.1:8081");

            KafkaProducer<String, MyMsg> producer = new KafkaProducer<String, MyMsg>(props);
            String topic = "testproto";
            String key = "testkey";
            MyMsg m = MyMsg.newBuilder().setF1("testing").build();
            ProducerRecord<String, MyMsg> record = new ProducerRecord<String, MyMsg>(topic, key, m);
            producer.send(record).get();
            producer.close();
         catch (Exception e) 
            System.out.println(e.toString());
        
    


这是我的原型文件

syntax = "proto3";
package test;

message MyMsg 
    string f1 = 1;

所以我的问题是,这是正确的吗?我只能使用此连接器将 Avro 文件写入 HDFS 吗?还是我的配置不正确,我应该期待 HDFS 中的 protobuf 文件?

【问题讨论】:

【参考方案1】:

你需要设置format.class配置,

format.class 是 将数据写入存储时使用的格式类。格式类实现 io.confluent.connect.storage.format.Format 接口。

类型:类 默认值:io.confluent.connect.hdfs3.avro.AvroFormat 重要性:高 这些类默认可用:

io.confluent.connect.hdfs3.avro.AvroFormat io.confluent.connect.hdfs3.json.JsonFormat io.confluent.connect.hdfs3.parquet.ParquetFormat io.confluent.connect.hdfs3.string.StringFormat

https://docs.confluent.io/kafka-connect-hdfs3-sink/current/configuration_options.html#hdfs3-config-options

【讨论】:

好收获。好像暂时没有对format.class的protobuf支持。 它可以实现我猜因为代码在github中提供 如果我的回答对您有帮助,您应该接受或至少 +1,谢谢 Github 中没有 HDFS3 连接器代码。 github.com/confluentinc/kafka-connect-hdfs/tree/master/src/main/…

以上是关于Kafka HDFS Sink Connector Protobuf 未写入的主要内容,如果未能解决你的问题,请参考以下文章

无法将 Kafka 与 InfluxDB Sink Connector 连接

Kafka JDBC Sink Connector,批量插入值

Kafka Confluent HTTP Sink Connector 的开源替代方案 [关闭]

Confluent Kafka Sink Connector 未将数据加载到 Postgres 表

Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?

Kafka JDBC Sink Connector 在雪花中找不到表