java [kafka] kafka with springboot #kafka #springboot

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java [kafka] kafka with springboot #kafka #springboot相关的知识,希望对你有一定的参考价值。

/** 消费者 */
@Component
@Slf4j
public class KafkaReceiver {

    @KafkaListener(topics = {"akernel-int"},groupId = "akernel-settle")
    public void listen(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }

    }
}

/** 生产者 */
@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("akernel-int", gson.toJson(message));
    }
}

/** 消息体 */
@Data
public class Message {
    private Long id;
    private String msg;
    private Date sendTime;
}

/** 需要引入的pom依赖 */
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>
        
/** application.properties */
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

#=============== provider  =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=akernel-settle

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

以上是关于java [kafka] kafka with springboot #kafka #springboot的主要内容,如果未能解决你的问题,请参考以下文章

kafka Java创建生产者报错:Invalid partition given with record: 1 is not in the range [0...1)

Flink with Avro Confluent Kafka-Registry

spring集成kafka运行时报错:Failed to construct kafka producer] with root cause

[Kafka with kraft] SASL_PLAINTEXT认证

Kafka Connect with MSSQL 不适用于删除操作

kafka cmd with ssl