flink-kafka 简单例子(java) map, filter, sink

Posted smile-yan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink-kafka 简单例子(java) map, filter, sink相关的知识,希望对你有一定的参考价值。

问题描述

很久以前实习的时候接触flink,并且在此方面做了很多工作,但是过了几年时间,居然再次搭建 kafka-flink 会遇到那么多困难,无意间翻到那个时候写的文档,这里再次总结并公开一下,希望也能帮到需要的人。

效果描述

首先必须强调一点:以下内容并不是以 flink 的job的方式在flink中运行,但是,可以打包成 jar ,然后提交到 flink 中运行。

因为本地测试的需要,一般这种入门级别,用来验证 kafka 的项目,本地运行即可。

当启动本地 IDE,即进入监听 kafka 的状态,然后进入云服务器,启动kafka 生产者脚本,手动输入消息到 kafka 中。IDE 中的效果就是消费这些消息,全部都打印出来。如图所示:

源码

源码地址:https://gitee.com/smile-yan/flink-kafka-demo.git

环境介绍:

  • kafka 版本:kafka_2.13-3.0.0,但是实际上跟 kafka 版本联系不大。相差不太大的版本应该都可以用。
  • Oracle JDK 1.8
  • Flink :flink-1.12.5 ,为了和 pom 保持此一致,最好使用 1.12 版本比较稳定,其他版本可以考虑试一下。

pom 文件,注意 要保持 kafka, flink 版本相一致,不要轻易尝试随便改其中某一个版本。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.smileyan.demo</groupId>
    <artifactId>flink-kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <properties>
        <influxdb.java.version>2.15</influxdb.java.version>
        <okhttp.version>3.13.1</okhttp.version>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
    </properties>
    <profiles>
        <profile>
            <id>local</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <flink.scope>compile</flink.scope>
            </properties>
        </profile>
        <profile>
            <id>prod</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <properties>
                <flink.scope>provided</flink.scope>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
            <scope>$flink.scope</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>$flink.scope</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>$flink.scope</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.69</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>$maven-shade-plugin.version</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>$java.version</source>
                    <target>$java.version</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

接着一个java文件,源码如下:

package cn.smileyan.demo;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/**
 * 作者 smileyan
 * 参考博客:https://smileyan.blog.csdn.net/article/details/121044085
 */
public class HelloFlink 

    public static void main(String[] args) throws Exception 
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "服务器地址:9092");
        props.put("zookeeper.connect", "服务器地址:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(
                "metric",
                new SimpleStringSchema(),
                props)).setParallelism(1);

        dataStreamSource.print();
        env.execute("Flink add data source");
    


运行效果如上面第一个图所示,再次说明:图中的效果是本地IDE启动后,监听 kafka 的效果,并没有真正打包成 jar 项目并提交到 flink 而运行。但是这份源码是可以打包提交到 flink 运行的(已测有效)

map 一下进一步处理数据

对于输入数据,我们假设输入的是数字,那么我们的目标是输出它的平方。

StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.put("bootstrap.servers", "服务器地址:9092");
props.put("zookeeper.connect", "服务器地址:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
        
SingleOutputStreamOperator<Long> metric = env.addSource(new FlinkKafkaConsumer<String>(
                        "metric",
                        new SimpleStringSchema(),
                        props))
                .setParallelism(1).map(new MapFunction<String, Long>() 
                    @Override
                    public Long map(String s) throws Exception 
                        long x = Long.parseLong(s);
                        return x * x;
                    
                );
        metric.print();
env.execute("Flink add data source");

filter 添加一下过滤

现在的想法是,只有对于输入数字才进行map,其他的直接过滤掉。

需要实现FilerFunction接口,代码如下:

StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.put("bootstrap.servers", "服务器地址:9092");
props.put("zookeeper.connect", "服务器地址:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

SingleOutputStreamOperator<Long> metric = env.addSource(new FlinkKafkaConsumer<String>(
               "metric",
               new SimpleStringSchema(),
               props))
       .setParallelism(1)
       .问一个简单问题.JS数组中的字符串怎么转为数值

快速幂

快速幂

例子.ZC简单.JSP和session

JSON字符串转换JAVA对象例子。

java小程序的一个bug