spring kafka中是不是有多个生产者的代码示例?

Posted

技术标签:

【中文标题】spring kafka中是不是有多个生产者的代码示例?【英文标题】:Is there a code sample for multiple producers in spring kafka?spring kafka中是否有多个生产者的代码示例? 【发布时间】:2018-08-04 04:13:47 【问题描述】:

我有一个可能需要多个生产者的应用程序。我看到的所有代码示例似乎都支持单个生产者,在应用启动期间从应用读取配置。如果有多个生产者并且我们想传入不同的生产者配置,那么 Spring 是否提供开箱即用的支持?还是在这种情况下我应该不使用弹簧?

【问题讨论】:

【参考方案1】:

您可以通过同一个 ProducerFactory 创建多个 Producer 实例 (KafkaTemplate)。

如果您需要不同的 Kafka 配置,则需要不同的 ProducerFactory 实例。

【讨论】:

【参考方案2】:

你必须创建两个不同的ProducerFactory 下面是示例

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;

    @Configuration
    public class KafkaProducerConfig 


        @Bean
        public ProducerFactory<String, String> confluentProducerFactory() 

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9092");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        


        @Bean
        public ProducerFactory<String, String> cloudraProducerFactory() 

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9094");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        

        @Bean(name = "confluent")
        public KafkaTemplate<String, String> confluentKafkaTemplate() 
            return new KafkaTemplate<>(confluentProducerFactory());
        

        @Bean(name = "cloudera")
        public KafkaTemplate<String, String> clouderaKafkaTemplate() 
            return new KafkaTemplate<>(cloudraProducerFactory());
        

    




public class ProducerExample 

    @Autowired
    @Qualifier("cloudera")
    private KafkaTemplate clouderaKafkaTemplate;


    @Autowired
    @Qualifier("confluent")
    private KafkaTemplate confluentKafkaTemplate;

    public void send() 
        confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
        clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
    


【讨论】:

和多个消费者? 应用属性中能否定义不同的生产者工厂(即生产者kafka config)? @user518066 是的,请参阅下面的答案。【参考方案3】:

从 2.5 版开始,您可以使用 RoutingKafkaTemplate 在运行时根据目标主题名称选择生产者。 https://docs.spring.io/spring-kafka/reference/html/#routing-template

【讨论】:

【参考方案4】:

如果您仍想像往常一样保留application.yaml 中的配置,并尽可能减少Java 配置,您可以扩展KafkaProperties.Producer


@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer 
    private final KafkaProperties common;

    @Qualifier("producer-1")
    @Bean
    public ProducerFactory<?, ?> producerFactory() 
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    

    @Qualifier("producer-1")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() 
        return new KafkaTemplate<>(this.producerFactory());

    


@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer 
    private final KafkaProperties common;

    @Qualifier("producer-2")
    @Bean
    public ProducerFactory<?, ?> producerFactory() 
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    

    @Qualifier("producer-2")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() 
        return new KafkaTemplate<>(this.producerFactory());

    


【讨论】:

以上是关于spring kafka中是不是有多个生产者的代码示例?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka生产者——结合spring开发

kafka

初识Kafka

Kafka知识总结

Spring Kafka 生产者抛出 TimeoutExceptions

2.kafka架构深入——生产者