Apache Flink:使用Apache Kafka作为DataSource的简单demo

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:使用Apache Kafka作为DataSource的简单demo相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要为测试和使用Apache Kafka作为Flink的DataSource的最基本的demo

当前内容主要参考:Flink官方文档Kafka官方文档

其中使用kafka发送消息的参考:博文

当前内容为

  1. 使用Apache Flink接收Kafka的topic中的消息作为DataSource
  2. 简单的print当前接收的消息

2. 基本pom依赖

核心为:flink-connector-kafka_2.12

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.13.0</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.12.1</log4j.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- This dependency is provided, because it should not be packaged into 
			the JAR file. -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<!-- Add connector dependencies here. They must be in the default scope 
			(compile). -->
		<!-- 直接导入需要的flink到kafka的连接器 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.12</artifactId>
			<version>1.13.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

3. 基本的demo

这里订阅的topic为test-events(只用设置这个即可)

import java.util.Properties;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/**
 * 
 * @author hy
 * @createTime 2021-05-23 10:38:40
 * @description 当前内容主要为测试和使用当前的Kafka作为Flink的数据源,开始接受来自Kafka的数据
 *
 */
public class LocalRunningKafkaDataSourceTest {
	public static void main(String[] args) {
		// 采用本地模式
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

		// 设定数据来源为集合数据
		String topic = "test-events"; //设置监听的主题
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.101:9092");
		props.setProperty("group.id", "test");
		DataStream<String> kafkaStream = env
				.addSource(new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props));
		kafkaStream.print();

		try {
			// 最后开始执行
			JobExecutionResult result = env.execute("Fraud Detection");
			if (result.isJobExecutionResult()) {
				System.out.println("执行完毕......");
			}
			System.out.println();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

4. 测试

在这里插入图片描述
在这里插入图片描述

上面的部分发送有的是测试了kafka的自己接受消息的没有关,后面追加的才是对的(没有数据丢失的问题)

以上是关于Apache Flink:使用Apache Kafka作为DataSource的简单demo的主要内容,如果未能解决你的问题,请参考以下文章

Flink从入门到精通100篇(二十一)-Apache Flink 与 Apache Hive 的集成

Apache Flink 入门,了解 Apache Flink

Apache Flink 入门,了解 Apache Flink

Apache Flink 入门,了解 Apache Flink

Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)

apache flink 入门