如何在 kafka 中创建自定义序列化程序?
Posted
技术标签:
【中文标题】如何在 kafka 中创建自定义序列化程序?【英文标题】:How to create Custom serializer in kafka? 【发布时间】:2017-03-02 10:24:06 【问题描述】:只有少数可用的序列化程序,例如,
org.apache.kafka.common.serialization.StringSerializer
我们如何创建自己的自定义序列化程序?
【问题讨论】:
【参考方案1】:这里有一个示例,您可以将自己的序列化器/反序列化器用于 Kafka 消息值。对于 Kafka 消息密钥是一样的。
我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,然后在消费者端将其再次反序列化为 MyMessage 对象。
在生产者端序列化 MyMessage。
您应该创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序类
serialize() 方法完成这项工作,接收您的对象并将序列化版本作为字节数组返回。
public class MyValueSerializer implements Serializer<MyMessage>
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
this.isKey = isKey;
@Override
public byte[] serialize(String topic, MyMessage message)
if (message == null)
return null;
try
(serialize your MyMessage object into bytes)
return bytes;
catch (IOException | RuntimeException e)
throw new SerializationException("Error serializing value", e);
@Override
public void close()
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
在消费者端反序列化 MyMessage。
您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer 的反序列化器类
deserialize() 方法完成工作,接收序列化值作为字节数组并返回您的对象。
public class MyValueDeserializer implements Deserializer<MyMessage>
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
this.isKey = isKey;
@Override
public MyMessage deserialize(String s, byte[] value)
if (value == null)
return null;
try
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
catch (IOException | RuntimeException e)
throw new SerializationException("Error deserializing value", e);
@Override
public void close()
然后像这样使用它:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records)
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
【讨论】:
final KafkaConsumer没有文字,只有代码
您发送给 Kafka 的一些对象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class TestDto
private String name;
private String version;
创建序列化器,生产者将使用该序列化器
@Slf4j
public class KafkaValueSerializer implements Serializer<TestDto>
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey)
@Override
public byte[] serialize(String topic, TestDto data)
try
return objectMapper.writeValueAsBytes(data);
catch (JsonProcessingException e)
log.error("Unable to serialize object ", data, e);
return null;
@Override
public void close()
当然,不要忘记 Deserializer for Consumer
@Slf4j
public class KafkaValueDeserializer implements Deserializer<TestDto>
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey)
@Override
public TestDto deserialize(String topic, byte[] data)
try
return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
catch (Exception e)
log.error("Unable to deserialize message ", data, e);
return null;
@Override
public void close()
最后一刻,将序列化器/反序列化器添加到 application.yml
spring:
kafka:
bootstrap-servers: 192.168.192.168:9092
producer:
value-serializer: com.package.service.kafka.KafkaValueSerializer
consumer:
group-id: groupId
value-deserializer: com.package.service.kafka.KafkaValueDeserializer
就是这样。不需要任何配置文件或手鼓跳舞:)
发送
KafkaTemplate<String, TestDto> kafkaTemplate;
TestDto test = new TestDto("test name", "test-version");
kafkaTemplate.send(topic, testDto);
听
@KafkaListener(topics = "$ktp-agent.kafka.request-topic", groupId = "$spring.kafka.consumer.group-id")
public void listen(TestDto message)
log.info("Received message '' from Kafka.", message.toString());
【讨论】:
Spring Kafka 已经有一个 JSON 序列化器和反序列化器,所以可能可以从这个答案中删除它们 是否可以通过 kafka 发送 Map您必须创建自己的序列化程序来实现接口Serializer
(org.apache.kafka.common.serialization.Serializer
),然后将生产者选项key.serializer
/ value.serializer
设置为它。
【讨论】:
以上是关于如何在 kafka 中创建自定义序列化程序?的主要内容,如果未能解决你的问题,请参考以下文章