flink-kafka 简单例子(java) map, filter, sink
Posted smile-yan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink-kafka 简单例子(java) map, filter, sink相关的知识,希望对你有一定的参考价值。
问题描述
很久以前实习的时候接触flink,并且在此方面做了很多工作,但是过了几年时间,居然再次搭建 kafka-flink 会遇到那么多困难,无意间翻到那个时候写的文档,这里再次总结并公开一下,希望也能帮到需要的人。
效果描述
首先必须强调一点:以下内容并不是以 flink 的job的方式在flink中运行,但是,可以打包成 jar ,然后提交到 flink 中运行。
因为本地测试的需要,一般这种入门级别,用来验证 kafka 的项目,本地运行即可。
当启动本地 IDE,即进入监听 kafka 的状态,然后进入云服务器,启动kafka 生产者脚本,手动输入消息到 kafka 中。IDE 中的效果就是消费这些消息,全部都打印出来。如图所示:
源码
源码地址:https://gitee.com/smile-yan/flink-kafka-demo.git
环境介绍:
- kafka 版本:kafka_2.13-3.0.0,但是实际上跟 kafka 版本联系不大。相差不太大的版本应该都可以用。
- Oracle JDK 1.8
- Flink :flink-1.12.5 ,为了和 pom 保持此一致,最好使用 1.12 版本比较稳定,其他版本可以考虑试一下。
pom 文件,注意 要保持 kafka, flink 版本相一致,不要轻易尝试随便改其中某一个版本。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.smileyan.demo</groupId>
<artifactId>flink-kafka-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<influxdb.java.version>2.15</influxdb.java.version>
<okhttp.version>3.13.1</okhttp.version>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
</properties>
<profiles>
<profile>
<id>local</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<flink.scope>compile</flink.scope>
</properties>
</profile>
<profile>
<id>prod</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<flink.scope>provided</flink.scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
<scope>$flink.scope</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>$flink.scope</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>$flink.scope</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
<version>$flink.version</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_$scala.binary.version</artifactId>
<version>$flink.version</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>$maven-shade-plugin.version</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>$java.version</source>
<target>$java.version</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
接着一个java文件,源码如下:
package cn.smileyan.demo;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/**
* 作者 smileyan
* 参考博客:https://smileyan.blog.csdn.net/article/details/121044085
*/
public class HelloFlink
public static void main(String[] args) throws Exception
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "服务器地址:9092");
props.put("zookeeper.connect", "服务器地址:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(
"metric",
new SimpleStringSchema(),
props)).setParallelism(1);
dataStreamSource.print();
env.execute("Flink add data source");
运行效果如上面第一个图所示,再次说明:图中的效果是本地IDE启动后,监听 kafka 的效果,并没有真正打包成 jar 项目并提交到 flink 而运行。但是这份源码是可以打包提交到 flink 运行的(已测有效)
map 一下进一步处理数据
对于输入数据,我们假设输入的是数字,那么我们的目标是输出它的平方。
StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "服务器地址:9092");
props.put("zookeeper.connect", "服务器地址:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Long> metric = env.addSource(new FlinkKafkaConsumer<String>(
"metric",
new SimpleStringSchema(),
props))
.setParallelism(1).map(new MapFunction<String, Long>()
@Override
public Long map(String s) throws Exception
long x = Long.parseLong(s);
return x * x;
);
metric.print();
env.execute("Flink add data source");
filter 添加一下过滤
现在的想法是,只有对于输入数字才进行map,其他的直接过滤掉。
需要实现FilerFunction接口,代码如下:
StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "服务器地址:9092");
props.put("zookeeper.connect", "服务器地址:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Long> metric = env.addSource(new FlinkKafkaConsumer<String>(
"metric",
new SimpleStringSchema(),
props))
.setParallelism(1)
.问一个简单问题.JS数组中的字符串怎么转为数值