如何在Vertx Kafka客户端中使用自定义序列化程序?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Vertx Kafka客户端中使用自定义序列化程序?相关的知识,希望对你有一定的参考价值。
我有以下kafka生产者属性。
value.serializer=MyEventSerializer
value.deserializer=MyEventDeserializer
default.value.serde=MyEventSerde
我已经通过Vertx site中的Serializers并使用了创建生产者
KafkaProducer<String, MyEvent> producer = KafkaProducer.create(vertx, configProperties, String.class, MyEvent.class);
但我收到以下错误:
SEVERE: Unknown class for built-in
serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes
java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes
有没有办法在Vertx kafka客户端中使用自定义序列化程序?
答案
我必须手动完成KafkaProducer.create()
的工作。
Serializer<String> keySerializer = VertxSerdes.serdeFrom(String.class).serializer();
Serializer<MyEvent> valueSerializer = new MyEventSerializer();
KafkaWriteStream<String, MyEvent> stream = new KafkaWriteStreamImpl(vertx.getOrCreateContext(), new org.apache.kafka.clients.producer.KafkaProducer(configProperties, keySerializer, valueSerializer));
KafkaProducer<String,MyEvent> producer=(new KafkaProducerImpl(stream)).registerCloseHook();
然后使用..写下记录
KafkaProducerRecord producerRecord= KafkaProducerRecord.create(topicName,key,value);
producer.write(producerRecord, done -> {
if (done.succeeded()) {
// TODO if succeeded
} else {
// TODO if failed
}
});
以上是关于如何在Vertx Kafka客户端中使用自定义序列化程序?的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器