在一个 Kafka Topic 下发送两个 Serialized Java 对象
Posted
技术标签:
【中文标题】在一个 Kafka Topic 下发送两个 Serialized Java 对象【英文标题】:Send two Serialized Java objects under one Kafka Topic 【发布时间】:2021-04-28 04:41:39 【问题描述】:我想实现发送和接收 Java 对象的 Kafka 消费者和生产者。完整的Source 我试过这个:
制片人:
@Configuration
public class KafkaProducerConfig
@Value(value = "$kafka.bootstrapAddress")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory()
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
@Bean
public ProducerFactory<String, String> producerFactory()
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate()
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
@Bean
public KafkaTemplate<String, String> kafkaTemplate()
return new KafkaTemplate<>(producerFactory());
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory)
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
发送对象:
@RestController
@RequestMapping("/checkout")
public class CheckoutController
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate)
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
消费者:
@EnableKafka
@Configuration
public class KafkaConsumerConfig
@Value(value = "$kafka.bootstrapAddress")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory()
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
接收对象
@Component
public class ProcessingSaleListener
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
自定义对象
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable
private static final long serialVersionUID = 1744050117179344127L;
private int id;
序列化器
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory>
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
catch (IOException e)
throw new RuntimeException("Unhandled", e);
return out.toByteArray();
响应对象
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
响应类
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory>
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
SaleRequestFactory saleRequestFactory = null;
try
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
catch (IOException | ClassNotFoundException e)
throw new RuntimeException("Unhandled", e);
return saleRequestFactory;
我想根据对象类型发送和接收不同的序列化 Java 对象。例如有时SaleRequestFactory
和接收SaleResponseFactory
或发送AuthRequestFactory
和接收AuthResponseFactory
。是否可以使用一个主题发送和接收不同的 Java Obect?
完整示例code
【问题讨论】:
我不知道如何实现代码。你能告诉我吗?我不清楚如何为这种情况配置消费者和生产者。 【参考方案1】:这是可能的,但您需要为每个对象类型创建两个独立的生产者工厂。或者使用 ByteArraySerializer 并自己序列化对象(相当于 Gary 的回答)
如果您确实想要正确反序列化对象,那么对于消费者来说也是如此。否则,您将使用 ByteArrayDeserializer(同样,相当于 Gary 显示的反序列化器),然后假设 Java 无法确定字节中的对象类型(哪些是序列化对象流,您将在记录中包含额外的元数据,例如作为标题,或您解析以确定如何反序列化数据的特定键,然后自己调用相应的反序列化方法
总体而言,我建议重新评估为什么需要将不同类型的记录放在一个主题中,或者查看替代消息格式,包括 CloudEvents 规范之类的内容,或者使用 Avro / Protobuf / 多态 JSON 类型,这会起作用更好地与 Kafka 以外的客户端一起使用
【讨论】:
什么?new ProducerRecord<>("topic", serializer.serialize(obj));
?
和@KafkaListener... process(@Payload byte[] data...)
与deserializer.deserialize(data)
通话?
这是我面临的设计问题的描述***.com/questions/65811681/…你能告诉我如何用不同的生产者工厂来实现这个吗?
由于使用的是Java序列化,所以不需要2个工厂;只需使用Object
作为值类型。在接收方,使用类级别的@KafkaListener
和@KafkaHandler
方法为每种类型和框架将负责路由。例如,请参阅我的答案。【参考方案2】:
使用 Object
作为值类型 - 这是一个使用 Boot 的自动配置基础设施 bean 的示例...
@SpringBootApplication
public class So65866763Application
public static void main(String[] args)
SpringApplication.run(So65866763Application.class, args);
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template)
return args ->
template.send("so65866763", new Foo());
template.send("so65866763", new Bar());
;
@Bean
public NewTopic topic()
return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
class Foo implements Serializable
class Bar implements Serializable
@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener
@KafkaHandler
void fooListener(Foo foo)
System.out.println("In fooListener: " + foo);
@KafkaHandler
void barListener(Bar bar)
System.out.println("In barListener: " + bar);
public class JavaSerializer implements Serializer<Object>
@Override
public byte[] serialize(String topic, Object data)
return null;
@Override
public byte[] serialize(String topic, Headers headers, Object data)
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos))
oos.writeObject(data);
return baos.toByteArray();
catch (IOException e)
throw new UncheckedIOException(e);
public class JavaDeserializer implements Deserializer<Object>
@Override
public Object deserialize(String topic, byte[] data)
return null;
@Override
public Object deserialize(String topic, Headers headers, byte[] data)
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais))
return ois.readObject();
catch (IOException e)
throw new UncheckedIOException(e);
catch (ClassNotFoundException e)
throw new IllegalStateException(e);
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288
【讨论】:
非常好的答案加里!我有兴趣问你我是否使用这种方法 Kafka 的优点/缺点是什么?目前我能想到的优势是它会消耗更少的资源并且比 JSON 类型的有效负载更快。你怎么看? 这种方法的最大缺点是Java序列化将生产者和消费者紧密耦合(两个类路径上需要相同的类)。使用 JSON,它们可以是完全不同(但兼容)的类型;此外,非 Java 应用程序可以使用/生成 JSON。还有其他多语言技术,例如Google 协议缓冲区、Avro 等 就我而言,整个应用程序将是 100% Java。所以这不是问题。你用过 Google Protocol Buffers,Avro 吗?你能分享一下为什么使用它们更好吗? 我没有,但是网上有很多意见。 我看到了一个限制。在您的示例中,对象被发送而没有回复。我需要返回ReplyObject
,每个@KafkaHandler
都不同。这可能吗?以上是关于在一个 Kafka Topic 下发送两个 Serialized Java 对象的主要内容,如果未能解决你的问题,请参考以下文章