Kafka 生产者 自定义序列化
Posted fubinhnust
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 生产者 自定义序列化相关的知识,希望对你有一定的参考价值。
Kafka在生产者中序列化为二进制对象推送给Broker,下面是一个自定义序列化的示例,序列化一个User对象;
首先,引入jackson-mapper-asl
<dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.12</version> </dependency>
然后定义需要被序列化的实体类:
package cn.org.fubin; public class User { private String firstName; private String lastName; private int age; private String address; public User() { } public User(String firstName, String lastName, int age, String address) { this.firstName = firstName; this.lastName = lastName; this.age = age; this.address = address; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User{" + "firstName=‘" + firstName + ‘‘‘ + ", lastName=‘" + lastName + ‘‘‘ + ", age=" + age + ", address=‘" + address + ‘‘‘ + ‘}‘; } }
接下来,创建序列化类,实现Kafka客户端提供的Serializer接口:
import org.apache.kafka.common.serialization.Serializer; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Map; public class UserSerializer implements Serializer { private ObjectMapper objectMapper; public void configure(Map configs, boolean isKey) { objectMapper = new ObjectMapper(); } public byte[] serialize(String topic, Object data) { byte[] ret = null; try { ret = objectMapper.writeValueAsString(data).getBytes("utf-8"); } catch (IOException e) { System.out.println("序列化失败"); e.printStackTrace(); } return ret; } public void close() { } }
Kafka默认提供如下实现:
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.errors.RetriableException; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * * 可重试异常 * 1. 分区副本不可用 * 2. Controller当前不可用 * 3. 网络瞬时故障 * * 可自行恢复,超过重试次数也需要自行处理 * * * 不可重试异常 * 1. 发送消息尺寸过大 * 2. 序列化失败异常 * 3. 其他类型异常 * * */ public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "cn.org.fubin.UserSerializer"); properties.put("acks", "-1"); System.out.println(ProducerConfig.ACKS_CONFIG); properties.put("retries", "3"); properties.put("batch.size", 1048576); properties.put("linger.ms", 10); properties.put("buffer.memory", "33554432"); System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4"); properties.put("max.block.ms", "3000"); String topic = "test-topic"; Producer<String,User> producer = new KafkaProducer<String, User>(properties); User user = new User("a","b",23,"china"); ProducerRecord<String ,User> record = new ProducerRecord<String, User>(topic,user); producer.send(record).get(); producer.close(); } }
然后在主类中指定声明好的序列化类,并发送一个User实体:
以上是关于Kafka 生产者 自定义序列化的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段