kafka 集成整合外部插件(springboot,flume,flink,spark)

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 集成整合外部插件(springboot,flume,flink,spark)相关的知识,希望对你有一定的参考价值。

一 kafka集成springboot

1.工程结构

 2.pom文件

   <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.1</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
<!--springboot 启动 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.21</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>

3.resouce配置文件

spring.application.name=kf-demo
# 指定 kafka 的地址
spring.kafka.bootstrapservers=192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer

# =========消费者配置开始=========
# 指定 kafka 的地址
#spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=test2

4.生产者

package com.ljf.spring.boot.demo.producerspt;

import com.ljf.spring.boot.demo.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName: ProducerSpt
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/12 08:16:35
 * @Version: V1.0
 **/
@RestController
public class ProducerSpt 
    // Kafka 模板用来向 kafka 发送数据
    @Autowired
    KafkaTemplate<String, String> kafka;

    @RequestMapping("/send")
    public String data(String msg) 
        Map map= new HashMap<>();
        map.put("name","beijing");
        map.put("time", DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss"));
        map.put("msg",msg);
        kafka.send("kafka-ljf", map.toString());
        return "ok";
    

 5.配置消费者

package com.ljf.spring.boot.demo.consumerspt;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @ClassName: KafkaConsumer
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/12 08:21:59
 * @Version: V1.0
 **/
@Configuration
public class KafkaConsumer 
    // 指定要监听的 topic
    @KafkaListener(topics = "kafka-ljf")
    public void consumeTopic(String msg)  // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    

 6.启动zk,kafka集群,启动程序

 启动程序

 

7.测试

生产端:

 消费端:

二  kafka集成flume

1.作为生产者

 2.作为消费者

 具体代码实现见百度网盘pdf

 

三  kafka集成flink

1.作为生产者

 2.作为消费者

见百度网盘

 

四  集成spark

 资料见百度网盘

 

以上是关于kafka 集成整合外部插件(springboot,flume,flink,spark)的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot集成整合pageHelper分页插件

Day548.Kafka相关外部系统整合 -kafka

SpringBoot基础集成插件

springboot整合系列

spring boot怎么启动kafka

SpringBoot整合Kafka