kafka 集成整合外部插件(springboot,flume,flink,spark)
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 集成整合外部插件(springboot,flume,flink,spark)相关的知识,希望对你有一定的参考价值。
一 kafka集成springboot
1.工程结构
2.pom文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!--springboot 启动 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
3.resouce配置文件
spring.application.name=kf-demo
# 指定 kafka 的地址
spring.kafka.bootstrapservers=192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# =========消费者配置开始=========
# 指定 kafka 的地址
#spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=test2
4.生产者
package com.ljf.spring.boot.demo.producerspt;
import com.ljf.spring.boot.demo.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName: ProducerSpt
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/12 08:16:35
* @Version: V1.0
**/
@RestController
public class ProducerSpt
// Kafka 模板用来向 kafka 发送数据
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/send")
public String data(String msg)
Map map= new HashMap<>();
map.put("name","beijing");
map.put("time", DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss"));
map.put("msg",msg);
kafka.send("kafka-ljf", map.toString());
return "ok";
5.配置消费者
package com.ljf.spring.boot.demo.consumerspt;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
/**
* @ClassName: KafkaConsumer
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/12 08:21:59
* @Version: V1.0
**/
@Configuration
public class KafkaConsumer
// 指定要监听的 topic
@KafkaListener(topics = "kafka-ljf")
public void consumeTopic(String msg) // 参数: 收到的 value
System.out.println("收到的信息: " + msg);
6.启动zk,kafka集群,启动程序
启动程序
7.测试
生产端:
消费端:
二 kafka集成flume
1.作为生产者
2.作为消费者
具体代码实现见百度网盘pdf
三 kafka集成flink
1.作为生产者
2.作为消费者
见百度网盘
四 集成spark
资料见百度网盘
以上是关于kafka 集成整合外部插件(springboot,flume,flink,spark)的主要内容,如果未能解决你的问题,请参考以下文章