FLINK 基于1.15.2的Java开发-入门

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-入门相关的知识,希望对你有一定的参考价值。

下载:Apache Flink: Downloads

下载后进入bin目录,如果是单机直接运行:

./start-cluster

运行后打开IE,输入http://localhost:8081

得到以下界面:

书写第一个例子,需求:读入一个.txt文件,按照空格统计每个词的重复次数,然后输出到一个output文件夹。

JDK:OpenJDK11(必须使用此JDK)

Flink:1.15.2

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aldi.flink.demo</groupId>
    <artifactId>SinkToRedis</artifactId>
    <version>0.0.1</version>
    <properties>
        <jcuda.version>10.0.0</jcuda.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>        
        <curator-framework.version>4.0.1</curator-framework.version>
        <curator-recipes.version>2.8.0</curator-recipes.version>       
        <guava.version>27.0.1-jre</guava.version>
        <fastjson.version>1.2.59</fastjson.version>               
        <jackson-databind.version>2.11.1</jackson-databind.version>        
        <hutool-crypto.version>5.0.0</hutool-crypto.version>
        <maven.compiler.source>$java.version</maven.compiler.source>
        <maven.compiler.target>$java.version</maven.compiler.target>
        <compiler.plugin.version>3.8.1</compiler.plugin.version>
        <war.plugin.version>3.2.3</war.plugin.version>
        <jar.plugin.version>3.1.1</jar.plugin.version>       
        <poi.version>4.1.0</poi.version>
        <poi-ooxml.version>4.1.0</poi-ooxml.version>
        <poi-ooxml-schemas.version>4.1.0</poi-ooxml-schemas.version>
        <dom4j.version>1.6.1</dom4j.version>        
        <log4j2.version>2.17.1</log4j2.version>
        <commons-lang3.version>3.4</commons-lang3.version>
        <common-util.version>0.0.1</common-util.version>
        <flink.version>1.15.2</flink.version>
    </properties>
    <dependencies>
        <!-- core dependencies -->
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <!-- test dependencies -->
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>$flink.version</version>
            <scope>test</scope>
        </dependency>
 
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.1.5</version>
    </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aldi.flink.demo.SinkToRedis</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 WordCount.java

/**
 * 系统项目名称 com.aldi.flink.demo.hello WordCount.java
 *
 * 2022年9月22日-上午11:28:33 2022XX公司-版权所有
 *
 */
package com.aldi.flink.demo.hello;
 
import java.time.Duration;
 
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
 
/**
 *
 * WordCount
 *
 *
 * 2022年9月22日 上午11:28:33
 *
 * @version 1.0.0
 *
 */
public class WordCount 
 
    /**
     * main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
     *
     * @param args
     *            void
     * @exception
     * @since 1.0.0
     */
    public static void main(String[] args) throws Exception 
        // 1、创建执行环境
        // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据
        String path = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/words.txt";
        String outputPath = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/output";
        DataStream<String> text;
        // DataSet -> Operator -> DataSource
        FileSource.FileSourceBuilder<String> builder =
            FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(path));
        text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
        DataStream<Tuple2<String, Integer>> counts =
            // The text lines read from the source are split into words
            // using a user-defined function. The tokenizer, implemented below,
            // will output each word as a (2-tuple) containing (word, 1)
            text.flatMap(new Tokenizer()).name("tokenizer")
                // keyBy groups tuples based on the "0" field, the word.
                // Using a keyBy allows performing aggregations and other
                // stateful transformations over data on a per-key basis.
                // This is similar to a GROUP BY clause in a SQL query.
                .keyBy(value -> value.f0)
                // For each key, we perform a simple sum of the "1" field, the count.
                // If the input data stream is bounded, sum will output a final count for
                // each word. If it is unbounded, it will continuously output updates
                // each time it sees a new instance of each word in the stream.
                .sum(1).name("counter");
        counts.sinkTo(FileSink.<Tuple2<String, Integer>>forRowFormat(new Path(outputPath), new SimpleStringEncoder<>())
            .withRollingPolicy(DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes(10))
                .withRolloverInterval(Duration.ofSeconds(10)).build())
            .build()).name("file-sink");
        env.execute("WordCount");
        System.out.println("done");
    
 
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> 
 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) 
            // normalize and split the line
            String[] tokens = value.toLowerCase().split(" ");
 
            // emit the pairs
            for (String token : tokens) 
                if (token.length() > 0) 
                    out.collect(new Tuple2<>(token, 1));
                
            
        
    

然后使用mavan打包,此处我们使用的是flink推荐的maven-shard-plugin打包:

mvn clean install

得到这样的一个包

进入flink的web ui界面

使用【Add New】按钮上传这个包。

由于是单机Parallelism里填1或者不填(默认为集群数),如果是3机集群可以填3(>3倍计算效率)。

然后你可以看到这个job已经运行完毕了。

我们在我们的output目录得到以下这些输出,打开这些输出就可以看到这些统计了。

也可以sink(输出)到hdfs上去。

完成第一课。 

以上是关于FLINK 基于1.15.2的Java开发-入门的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-自定义Source端

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-Sink到MYSQL的两种姿势

FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式