Springboot??????Kfka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot??????Kfka相关的知识,希望对你有一定的参考价值。

?????????zook   static   art   wired   1.5   comm   over   manage   autowired   

1.?????????pom??????????????????

The managed version is 1.1.7.RELEASE The artifact is managed in org.springframework.boot:spring-boot-dependencies:1.5.9.RELEASE

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.yml?????????????????????

spring:
  datasource:
    url: jdbc:postgresql://127.0.0.1:5432/dev
    username: dev
    password: dev
  dubbo:
    registry:
      address: zookeeper://127.0.0.1:2181
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: grp1
                   

3.Consumer java???

@Component
public class JobTrackingConsumer implements BaseConsumer{

    private final static Logger logger = LoggerFactory.getLogger(JobTrackingConsumer.class);

    @Autowired
    private JobBasicCache cache;
    
    @KafkaListener(topics = {CommonConstant.FTP_CATEGPRY_ID, CommonConstant.HTTP_CATEGPRY_ID})
    @Override
    public void consume(ConsumerRecord<?, ?> consumer) {
        Optional<String> kafkaMessage = (Optional<String>) Optional.ofNullable(consumer.value());
        if (kafkaMessage.isPresent()) {
            logger.info("topic:{}, msg:{}", consumer.topic(), kafkaMessage.get());
            cache.push(consumer.topic().toString(), kafkaMessage.get());
        }
    }

}

 

以上是关于Springboot??????Kfka的主要内容,如果未能解决你的问题,请参考以下文章

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

SpringBoot中表单提交报错“Content type ‘application/x-www-form-urlencoded;charset=UTF-8‘ not supported“(代码片段

Spring boot:thymeleaf 没有正确渲染片段

11SpringBoot-CRUD-thymeleaf公共页面元素抽取

学习小片段——springboot 错误处理

springboot 底层点的知识