Kafka鈥斺€擲pringBoot鏁村悎锛堟秷璐硅€咃級

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka鈥斺€擲pringBoot鏁村悎锛堟秷璐硅€咃級相关的知识,希望对你有一定的参考价值。

鏍囩锛?a href='http://www.mamicode.com/so/1/details' title='details'>details   鎵归噺   pid   鑾峰彇   搴忓垪   鎻愪氦   RKE   depend   org   

閫夋嫨鑷姩鎻愪氦杩樻槸鎵嬪姩鎻愪氦鏂瑰紡鍜屼笟鍔″満鏅浉鍏筹紝鍙互鏌ョ湅鍓嶉潰鐨勫崥瀹紝鏍规嵁鍘熺悊杩涜閫夋嫨銆?/p>

pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>

consumerConfig

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * 鍗曠嚎绋?鍗曟潯娑堣垂
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> stringKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        return factory;
    }

}

consumer

@Component
public class KafkaReceiver {
    
    private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);


    //@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
    public void receiveString(String message) {
        logger.info("Message : %s" +message);
    }



    /**
     * 娉ㄨВ鏂瑰紡鑾峰彇娑堟伅澶村強娑堟伅浣?     *
     * @Payload锛氳幏鍙栫殑鏄秷鎭殑娑堟伅浣擄紝涔熷氨鏄彂閫佸唴瀹?     * @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)锛氳幏鍙栧彂閫佹秷鎭殑key
     * @Header(KafkaHeaders.RECEIVED_PARTITION_ID)锛氳幏鍙栧綋鍓嶆秷鎭槸浠庡摢涓垎鍖轰腑鐩戝惉鍒扮殑
     * @Header(KafkaHeaders.RECEIVED_TOPIC)锛氳幏鍙栫洃鍚殑TopicName
     * @Header(KafkaHeaders.RECEIVED_TIMESTAMP)锛氳幏鍙栨椂闂存埑
     *
     */

    //@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
    public void receive(@Payload String message,
                         //@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        logger.info("topic : " +topic);
        logger.info("partition : " +partition);
        //logger.info("key : " +key.toString());
        logger.info("TIMESTAMP : " +ts);
        logger.info("message : " +message);
    }



/**
     * 鎸囧畾娑堣垂鍒嗗尯鍜屽垵濮嬪亸绉婚噺
     *
     * @TopicPartition锛歵opic--闇€瑕佺洃鍚殑Topic鐨勫悕绉帮紝partitions --闇€瑕佺洃鍚琓opic鐨勫垎鍖篿d锛宲artitionOffsets --鍙互璁剧疆浠庢煇涓亸绉婚噺寮€濮嬬洃鍚?     * @PartitionOffset锛歱artition --鍒嗗尯Id锛岄潪鏁扮粍锛宨nitialOffset --鍒濆鍋忕Щ閲?     *
     */

    @KafkaListener(containerFactory = "stringKafkaListenerContainerFactory",
                   topicPartitions = @TopicPartition(topic = "hello", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "2")))
    public void receiveFromBegin(@Payload String payload,
                                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(String.format("Read all message from partition %d : %s", partition, payload));
    }


/**
     * ConsumerRecord 鎺ユ敹
     *
     * @param record
     */

    //@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> record) {
        System.out.println("Message is :" + record.toString());
    }
}

鎵归噺娑堣垂

寮€鍚壒閲忔秷璐归渶瑕?姝?br /> 1銆佹秷璐硅€呰缃?max.poll.records/
2銆佹秷璐硅€?寮€鍚壒閲忔秷璐?factory.setBatchListener(true);
3銆佹秷璐硅€呮壒閲忔帴鏀?public void consumerBatch(List<ConsumerRecord<?, ?>> records)

javaConfig

@Configuration
@EnableKafka
public class BatchConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * 澶氱嚎绋?鎵归噺娑堣垂
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 鎺у埗澶氱嚎绋嬫秷璐?骞跺彂鏁?濡傛灉topic鏈?鍚勫垎鍖恒€傝缃垚3锛屽苟鍙戞暟灏辨槸3涓嚎绋嬶紝鍔犲揩娑堣垂), 涓嶈缃畇etConcurrency灏变細鍙樻垚鍗曠嚎绋嬮厤缃? MAX_POLL_RECORDS_CONFIG涔熶細澶辨晥锛屾帴鏀剁殑娑堟伅鍒楄〃涔熶笉浼氭槸ConsumerRecord
        factory.setConcurrency(10);
        // poll瓒呮椂鏃堕棿
        factory.getContainerProperties().setPollTimeout(1500);
        // 鎺у埗鎵归噺娑堣垂
        // 璁剧疆涓烘壒閲忔秷璐癸紝姣忎釜鎵规鏁伴噺鍦↘afka閰嶇疆鍙傛暟涓缃紙max.poll.records锛?        factory.setBatchListener(true);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 娑堣垂鑰呴厤缃?     * @return
     */
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> configProps = new HashMap<>();
        // 涓嶇敤鎸囧畾鍏ㄩ儴鐨刡roker锛屽畠灏嗚嚜鍔ㄥ彂鐜伴泦缇や腑鐨勫叾浣欑殑borker, 鏈€濂芥寚瀹氬涓紝涓囦竴鏈夋湇鍔″櫒鏁呴殰
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // key搴忓垪鍖栨柟寮?        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // value搴忓垪鍖栨柟寮?        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // GroupID
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        // 鎵归噺娑堣垂娑堟伅鏁伴噺
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
        // 鑷姩鎻愪氦鍋忕Щ閲?        // 濡傛灉璁剧疆鎴恡rue,鍋忕Щ閲忕敱auto.commit.interval.ms鎺у埗鑷姩鎻愪氦鐨勯鐜?        // 濡傛灉璁剧疆鎴恌alse,涓嶉渶瑕佸畾鏃剁殑鎻愪氦offset锛屽彲浠ヨ嚜宸辨帶鍒秓ffset锛屽綋娑堟伅璁や负宸叉秷璐硅繃浜嗭紝杩欎釜鏃跺€欏啀鍘绘彁浜ゅ畠浠殑鍋忕Щ閲忋€?        // 杩欎釜寰堟湁鐢ㄧ殑锛屽綋娑堣垂鐨勬秷鎭粨鍚堜簡涓€浜涘鐞嗛€昏緫锛岃繖涓秷鎭氨涓嶅簲璇ヨ涓烘槸宸茬粡娑堣垂鐨勶紝鐩村埌瀹冨畬鎴愪簡鏁翠釜澶勭悊銆?        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 鑷姩鎻愪氦鐨勯鐜?        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // Session瓒呮椂璁剧疆
        configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 璇ュ睘鎬ф寚瀹氫簡娑堣垂鑰呭湪璇诲彇涓€涓病鏈夊亸绉婚噺鐨勫垎鍖烘垨鑰呭亸绉婚噺鏃犳晥鐨勬儏鍐典笅璇ヤ綔浣曞鐞嗭細
        // latest锛堥粯璁ゅ€硷級鍦ㄥ亸绉婚噺鏃犳晥鐨勬儏鍐典笅锛屾秷璐硅€呭皢浠庢渶鏂扮殑璁板綍寮€濮嬭鍙栨暟鎹紙鍦ㄦ秷璐硅€呭惎鍔ㄤ箣鍚庣敓鎴愮殑璁板綍锛?        // earliest 锛氬湪鍋忕Щ閲忔棤鏁堢殑鎯呭喌涓嬶紝娑堣垂鑰呭皢浠庤捣濮嬩綅缃鍙栧垎鍖虹殑璁板綍
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return configProps;
    }
}

娑堣垂鑰?BatchConsumer

@Component
@Slf4j
public class BatchConsumer {
    /**
     * 鎵归噺娑堟伅
     * @param records
     */
    @KafkaListener(topics = "hello", containerFactory="batchFactory")
    public void consumerBatch(List<ConsumerRecord<?, ?>> records){
        log.info("鎺ユ敹鍒版秷鎭暟閲忥細{}",records.size());
        for(ConsumerRecord record: records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                System.out.println("鎺ユ敹鍒版秷鎭細" + message);
            }
        }
    }
    
}

鍙傝€冨崥瀹細https://blog.csdn.net/yy756127197/article/details/103895413

以上是关于Kafka鈥斺€擲pringBoot鏁村悎锛堟秷璐硅€咃級的主要内容,如果未能解决你的问题,请参考以下文章

宸ヤ綔娴佸紩鎿嶢ctiviti涓嶴pringBoot2鏁村悎--寮€婧愯蒋浠惰癁鐢?7

鏂伴矞鍑虹倝锛岃繖鏄叏缃戣鐨勬渶璇︾粏鐨剆pringboot鏁村悎娑堟伅鏈嶅姟浜嗗惂锛屽缓璁敹钘忥紒

springboot鏁村悎activiti

Springboot鏁村悎Junit

Spring Boot 鏁村悎 Mybatis-Plus

JavaWeb_(Spring妗嗘灦)Spring鏁村悎Hibernate