flink使用kafka作为source和sink
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink使用kafka作为source和sink相关的知识,希望对你有一定的参考价值。
参考技术A 然后,在kafka里使用生产者脚本,flink就可以获取到,并且回写到kafka另一个topic中。Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)
1. 声明
当前内容主要为测试和使用Flink,将数据读取处理后放入到kafka的topic中
主要内容:
- 使用Flink读取文本内容
- 过滤读取的内容
- 将读取的内容放入kafka中
2.基本demo
1.pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into
the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<!-- 直接导入需要的flink到kafka的连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
2.开启一个命令行消费者
3.开始编写demo
import java.util.Properties;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
*
* @author hy
* @createTime 2021-05-23 10:38:40
* @description 当前内容主要将Apache Flink中处理的内容放入当前的Apache Kafka中(将Kafka作为数据输出的地点)
*
*/
public class LocalRunningToKafkaSaveTest {
public static void main(String[] args) {
// 采用本地模式
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设定数据来源为集合数据
String topic = "test-events"; // 设置监听的主题
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.1.101:9092");
props.setProperty("group.id", "test");
String filePath = "D:\\\\eclipse-workspace\\\\Apache-Flink-Start\\\\resources\\\\abc.txt";
DataStream<String> txtStream = env.readTextFile(filePath);
// 这里可以进行处理操作(但是结果需要赋值)
txtStream = txtStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
// TODO Auto-generated method stub
// 返回数据中不带有19的数据
if(!value.contains("19")) {
return true;
}
return false;
}
});
System.out.println("处理后的数据=====>");
txtStream.print();
System.out.println("<=====处理后的数据");
// 将读取的文本类型的数据注入到kafka的这个topic中
FlinkKafkaProducer<String> producer=new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(),props);
txtStream.addSink(producer);
System.out.println("将当前的数据处理后放入kafka的test-events这个topic成功!");
try {
// 最后开始执行
JobExecutionResult result = env.execute("Fraud Detection");
if (result.isJobExecutionResult()) {
System.out.println("执行完毕......");
}
System.out.println();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
小心这里的filter,以及使用的Sink为FlinkKafkaProducer(一个消息生产者)
3. 测试结果
命令行消费者结果:
4. 总结
1.由于本人没有找到一个清空topic的消息的,只要是订阅了的,以前发布的消息都会发送,并且接受到,只有执行删除topic的指令,然后在创建才有用
./bin/kafka-topics.sh --delete --topic test-events --bootstrap-server 192.168.1.101:9092
以上是关于flink使用kafka作为source和sink的主要内容,如果未能解决你的问题,请参考以下文章
使Flink SQL Kafka Source支持独立设置并行度
开发者干货 | 当Flink遇到Kafka - FlinkKafkaConsumer使用详解