Kafka鈥斺€擲pringBoot鏁村悎锛堟秷璐硅€咃級
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka鈥斺€擲pringBoot鏁村悎锛堟秷璐硅€咃級相关的知识,希望对你有一定的参考价值。
鏍囩锛?a href='http://www.mamicode.com/so/1/details' title='details'>details
鎵归噺 pid 鑾峰彇 搴忓垪 鎻愪氦 RKE depend org閫夋嫨鑷姩鎻愪氦杩樻槸鎵嬪姩鎻愪氦鏂瑰紡鍜屼笟鍔″満鏅浉鍏筹紝鍙互鏌ョ湅鍓嶉潰鐨勫崥瀹紝鏍规嵁鍘熺悊杩涜閫夋嫨銆?/p>
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
consumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 鍗曠嚎绋?鍗曟潯娑堣垂
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> stringKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
return factory;
}
}
consumer
@Component
public class KafkaReceiver {
private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receiveString(String message) {
logger.info("Message : %s" +message);
}
/**
* 娉ㄨВ鏂瑰紡鑾峰彇娑堟伅澶村強娑堟伅浣? *
* @Payload锛氳幏鍙栫殑鏄秷鎭殑娑堟伅浣擄紝涔熷氨鏄彂閫佸唴瀹? * @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)锛氳幏鍙栧彂閫佹秷鎭殑key
* @Header(KafkaHeaders.RECEIVED_PARTITION_ID)锛氳幏鍙栧綋鍓嶆秷鎭槸浠庡摢涓垎鍖轰腑鐩戝惉鍒扮殑
* @Header(KafkaHeaders.RECEIVED_TOPIC)锛氳幏鍙栫洃鍚殑TopicName
* @Header(KafkaHeaders.RECEIVED_TIMESTAMP)锛氳幏鍙栨椂闂存埑
*
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(@Payload String message,
//@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
logger.info("topic : " +topic);
logger.info("partition : " +partition);
//logger.info("key : " +key.toString());
logger.info("TIMESTAMP : " +ts);
logger.info("message : " +message);
}
/**
* 鎸囧畾娑堣垂鍒嗗尯鍜屽垵濮嬪亸绉婚噺
*
* @TopicPartition锛歵opic--闇€瑕佺洃鍚殑Topic鐨勫悕绉帮紝partitions --闇€瑕佺洃鍚琓opic鐨勫垎鍖篿d锛宲artitionOffsets --鍙互璁剧疆浠庢煇涓亸绉婚噺寮€濮嬬洃鍚? * @PartitionOffset锛歱artition --鍒嗗尯Id锛岄潪鏁扮粍锛宨nitialOffset --鍒濆鍋忕Щ閲? *
*/
@KafkaListener(containerFactory = "stringKafkaListenerContainerFactory",
topicPartitions = @TopicPartition(topic = "hello", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "2")))
public void receiveFromBegin(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(String.format("Read all message from partition %d : %s", partition, payload));
}
/**
* ConsumerRecord 鎺ユ敹
*
* @param record
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> record) {
System.out.println("Message is :" + record.toString());
}
}
鎵归噺娑堣垂
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 鍗曠嚎绋?鍗曟潯娑堣垂
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> stringKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
return factory;
}
}
@Component
public class KafkaReceiver {
private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receiveString(String message) {
logger.info("Message : %s" +message);
}
/**
* 娉ㄨВ鏂瑰紡鑾峰彇娑堟伅澶村強娑堟伅浣? *
* @Payload锛氳幏鍙栫殑鏄秷鎭殑娑堟伅浣擄紝涔熷氨鏄彂閫佸唴瀹? * @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)锛氳幏鍙栧彂閫佹秷鎭殑key
* @Header(KafkaHeaders.RECEIVED_PARTITION_ID)锛氳幏鍙栧綋鍓嶆秷鎭槸浠庡摢涓垎鍖轰腑鐩戝惉鍒扮殑
* @Header(KafkaHeaders.RECEIVED_TOPIC)锛氳幏鍙栫洃鍚殑TopicName
* @Header(KafkaHeaders.RECEIVED_TIMESTAMP)锛氳幏鍙栨椂闂存埑
*
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(@Payload String message,
//@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
logger.info("topic : " +topic);
logger.info("partition : " +partition);
//logger.info("key : " +key.toString());
logger.info("TIMESTAMP : " +ts);
logger.info("message : " +message);
}
/**
* 鎸囧畾娑堣垂鍒嗗尯鍜屽垵濮嬪亸绉婚噺
*
* @TopicPartition锛歵opic--闇€瑕佺洃鍚殑Topic鐨勫悕绉帮紝partitions --闇€瑕佺洃鍚琓opic鐨勫垎鍖篿d锛宲artitionOffsets --鍙互璁剧疆浠庢煇涓亸绉婚噺寮€濮嬬洃鍚? * @PartitionOffset锛歱artition --鍒嗗尯Id锛岄潪鏁扮粍锛宨nitialOffset --鍒濆鍋忕Щ閲? *
*/
@KafkaListener(containerFactory = "stringKafkaListenerContainerFactory",
topicPartitions = @TopicPartition(topic = "hello", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "2")))
public void receiveFromBegin(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(String.format("Read all message from partition %d : %s", partition, payload));
}
/**
* ConsumerRecord 鎺ユ敹
*
* @param record
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> record) {
System.out.println("Message is :" + record.toString());
}
}
寮€鍚壒閲忔秷璐归渶瑕?姝?br />
1銆佹秷璐硅€呰缃?max.poll.records/
2銆佹秷璐硅€?寮€鍚壒閲忔秷璐?factory.setBatchListener(true);
3銆佹秷璐硅€呮壒閲忔帴鏀?public void consumerBatch(List<ConsumerRecord<?, ?>> records)
javaConfig
@Configuration
@EnableKafka
public class BatchConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 澶氱嚎绋?鎵归噺娑堣垂
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 鎺у埗澶氱嚎绋嬫秷璐?骞跺彂鏁?濡傛灉topic鏈?鍚勫垎鍖恒€傝缃垚3锛屽苟鍙戞暟灏辨槸3涓嚎绋嬶紝鍔犲揩娑堣垂), 涓嶈缃畇etConcurrency灏变細鍙樻垚鍗曠嚎绋嬮厤缃? MAX_POLL_RECORDS_CONFIG涔熶細澶辨晥锛屾帴鏀剁殑娑堟伅鍒楄〃涔熶笉浼氭槸ConsumerRecord
factory.setConcurrency(10);
// poll瓒呮椂鏃堕棿
factory.getContainerProperties().setPollTimeout(1500);
// 鎺у埗鎵归噺娑堣垂
// 璁剧疆涓烘壒閲忔秷璐癸紝姣忎釜鎵规鏁伴噺鍦↘afka閰嶇疆鍙傛暟涓缃紙max.poll.records锛? factory.setBatchListener(true);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 娑堣垂鑰呴厤缃? * @return
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> configProps = new HashMap<>();
// 涓嶇敤鎸囧畾鍏ㄩ儴鐨刡roker锛屽畠灏嗚嚜鍔ㄥ彂鐜伴泦缇や腑鐨勫叾浣欑殑borker, 鏈€濂芥寚瀹氬涓紝涓囦竴鏈夋湇鍔″櫒鏁呴殰
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// key搴忓垪鍖栨柟寮? configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// value搴忓垪鍖栨柟寮? configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// GroupID
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// 鎵归噺娑堣垂娑堟伅鏁伴噺
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
// 鑷姩鎻愪氦鍋忕Щ閲? // 濡傛灉璁剧疆鎴恡rue,鍋忕Щ閲忕敱auto.commit.interval.ms鎺у埗鑷姩鎻愪氦鐨勯鐜? // 濡傛灉璁剧疆鎴恌alse,涓嶉渶瑕佸畾鏃剁殑鎻愪氦offset锛屽彲浠ヨ嚜宸辨帶鍒秓ffset锛屽綋娑堟伅璁や负宸叉秷璐硅繃浜嗭紝杩欎釜鏃跺€欏啀鍘绘彁浜ゅ畠浠殑鍋忕Щ閲忋€? // 杩欎釜寰堟湁鐢ㄧ殑锛屽綋娑堣垂鐨勬秷鎭粨鍚堜簡涓€浜涘鐞嗛€昏緫锛岃繖涓秷鎭氨涓嶅簲璇ヨ涓烘槸宸茬粡娑堣垂鐨勶紝鐩村埌瀹冨畬鎴愪簡鏁翠釜澶勭悊銆? configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 鑷姩鎻愪氦鐨勯鐜? configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Session瓒呮椂璁剧疆
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 璇ュ睘鎬ф寚瀹氫簡娑堣垂鑰呭湪璇诲彇涓€涓病鏈夊亸绉婚噺鐨勫垎鍖烘垨鑰呭亸绉婚噺鏃犳晥鐨勬儏鍐典笅璇ヤ綔浣曞鐞嗭細
// latest锛堥粯璁ゅ€硷級鍦ㄥ亸绉婚噺鏃犳晥鐨勬儏鍐典笅锛屾秷璐硅€呭皢浠庢渶鏂扮殑璁板綍寮€濮嬭鍙栨暟鎹紙鍦ㄦ秷璐硅€呭惎鍔ㄤ箣鍚庣敓鎴愮殑璁板綍锛? // earliest 锛氬湪鍋忕Щ閲忔棤鏁堢殑鎯呭喌涓嬶紝娑堣垂鑰呭皢浠庤捣濮嬩綅缃鍙栧垎鍖虹殑璁板綍
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return configProps;
}
}
娑堣垂鑰?BatchConsumer
@Component
@Slf4j
public class BatchConsumer {
/**
* 鎵归噺娑堟伅
* @param records
*/
@KafkaListener(topics = "hello", containerFactory="batchFactory")
public void consumerBatch(List<ConsumerRecord<?, ?>> records){
log.info("鎺ユ敹鍒版秷鎭暟閲忥細{}",records.size());
for(ConsumerRecord record: records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
System.out.println("鎺ユ敹鍒版秷鎭細" + message);
}
}
}
}
鍙傝€冨崥瀹細https://blog.csdn.net/yy756127197/article/details/103895413