在一个 Kafka Topic 下发送两个 Serialized Java 对象

Posted

技术标签:

【中文标题】在一个 Kafka Topic 下发送两个 Serialized Java 对象【英文标题】:Send two Serialized Java objects under one Kafka Topic 【发布时间】:2021-04-28 04:41:39 【问题描述】:

我想实现发送和接收 Java 对象的 Kafka 消费者和生产者。完整的Source 我试过这个:

制片人:

    @Configuration
public class KafkaProducerConfig 

    @Value(value = "$kafka.bootstrapAddress")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() 
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    

    @Bean
    public ProducerFactory<String, String> producerFactory() 
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() 
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() 
        return new KafkaTemplate<>(producerFactory());
    

    @Bean
    public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) 
        ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
        kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
        return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    

发送对象:

 @RestController
@RequestMapping("/checkout")
public class CheckoutController 
    
    private TransactionService transactionService;
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
    private static String topic = "tp-sale";

    @Autowired
    public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
                              KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate)
        this.transactionService = transactionService;
        this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    

    @PostMapping("test")
    private void performPayment() throws ExecutionException, InterruptedException, TimeoutException 

        Transaction transaction = new Transaction();
        transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());

        Transaction insertedTransaction = transactionService.save(transaction);

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
        RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
    

消费者:

@EnableKafka
@Configuration
public class KafkaConsumerConfig 

    @Value(value = "$kafka.bootstrapAddress")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() 
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() 
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    

接收对象

@Component
public class ProcessingSaleListener 

    private static String topic = "tp-sale";

    @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception 

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    

自定义对象

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable 

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;


序列化器

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> 

    @Override
    public byte[] serialize(String topic, SaleRequestFactory data)
    
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
        
        catch (IOException e)
        
            throw new RuntimeException("Unhandled", e);
        
        return out.toByteArray();
    

响应对象

import java.io.Serializable;
import java.time.LocalDateTime;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable 

    private static final long serialVersionUID = 1744050117179344127L;

    private String unique_id;

响应类

import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;

public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> 

    @Override
    public SaleRequestFactory deserialize(String topic, byte[] data)
    
        SaleRequestFactory saleRequestFactory = null;
        try
        
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis);
            saleRequestFactory = (SaleRequestFactory) in.readObject();
            in.close();
        
        catch (IOException | ClassNotFoundException e)
        
            throw new RuntimeException("Unhandled", e);
        
        return saleRequestFactory;
    

我想根据对象类型发送和接收不同的序列化 Java 对象。例如有时SaleRequestFactory 和接收SaleResponseFactory 或发送AuthRequestFactory 和接收AuthResponseFactory。是否可以使用一个主题发送和接收不同的 Java Obect?

完整示例code

【问题讨论】:

我不知道如何实现代码。你能告诉我吗?我不清楚如何为这种情况配置消费者和生产者。 【参考方案1】:

这是可能的,但您需要为每个对象类型创建两个独立的生产者工厂。或者使用 ByteArraySerializer 并自己序列化对象(相当于 Gary 的回答)

如果您确实想要正确反序列化对象,那么对于消费者来说也是如此。否则,您将使用 ByteArrayDeserializer(同样,相当于 Gary 显示的反序列化器),然后假设 Java 无法确定字节中的对象类型(哪些是序列化对象流,您将在记录中包含额外的元数据,例如作为标题,或您解析以确定如何反序列化数据的特定键,然后自己调用相应的反序列化方法

总体而言,我建议重新评估为什么需要将不同类型的记录放在一个主题中,或者查看替代消息格式,包括 CloudEvents 规范之类的内容,或者使用 Avro / Protobuf / 多态 JSON 类型,这会起作用更好地与 Kafka 以外的客户端一起使用

【讨论】:

什么? new ProducerRecord&lt;&gt;("topic", serializer.serialize(obj));? @KafkaListener... process(@Payload byte[] data...)deserializer.deserialize(data) 通话? 这是我面临的设计问题的描述***.com/questions/65811681/…你能告诉我如何用不同的生产者工厂来实现这个吗? 由于使用的是Java序列化,所以不需要2个工厂;只需使用Object 作为值类型。在接收方,使用类级别的@KafkaListener@KafkaHandler 方法为每种类型和框架将负责路由。例如,请参阅我的答案。【参考方案2】:

使用 Object 作为值类型 - 这是一个使用 Boot 的自动配置基础设施 bean 的示例...

@SpringBootApplication
public class So65866763Application 

    public static void main(String[] args) 
        SpringApplication.run(So65866763Application.class, args);
    

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) 
        return args -> 
            template.send("so65866763", new Foo());
            template.send("so65866763", new Bar());
        ;
    

    @Bean
    public NewTopic topic() 
        return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
    



class Foo implements Serializable 



class Bar implements Serializable 



@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener 

    @KafkaHandler
    void fooListener(Foo foo) 
        System.out.println("In fooListener: " + foo);
    

    @KafkaHandler
    void barListener(Bar bar) 
        System.out.println("In barListener: " + bar);
    


public class JavaSerializer implements Serializer<Object> 

    @Override
    public byte[] serialize(String topic, Object data) 
        return null;
    

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) 
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) 
            oos.writeObject(data);
            return baos.toByteArray();
        
        catch (IOException e) 
            throw new UncheckedIOException(e);
        
    


public class JavaDeserializer implements Deserializer<Object> 

    @Override
    public Object deserialize(String topic, byte[] data) 
        return null;
    

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) 
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) 
            return ois.readObject();
        
        catch (IOException e) 
            throw new UncheckedIOException(e);
        
        catch (ClassNotFoundException e) 
            throw new IllegalStateException(e);
        
    


spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288

【讨论】:

非常好的答案加里!我有兴趣问你我是否使用这种方法 Kafka 的优点/缺点是什么?目前我能想到的优势是它会消耗更少的资源并且比 JSON 类型的有效负载更快。你怎么看? 这种方法的最大缺点是Java序列化将生产者和消费者紧密耦合(两个类路径上需要相同的类)。使用 JSON,它们可以是完全不同(但兼容)的类型;此外,非 Java 应用程序可以使用/生成 JSON。还有其他多语言技术,例如Google 协议缓冲区、Avro 等 就我而言,整个应用程序将是 100% Java。所以这不是问题。你用过 Google Protocol Buffers,Avro 吗?你能分享一下为什么使用它们更好吗? 我没有,但是网上有很多意见。 我看到了一个限制。在您的示例中,对象被发送而没有回复。我需要返回ReplyObject,每个@KafkaHandler 都不同。这可能吗?

以上是关于在一个 Kafka Topic 下发送两个 Serialized Java 对象的主要内容,如果未能解决你的问题,请参考以下文章

怎么彻底删除kafka的topic,然后重建

windows 下远程连接kafka服务器并创建topic 部署服务

kafka删除topic

kafka集群环境下topic不可用

kafka命令行的管理使用

如何查看kafka 各partition数据量