FLINK 基于1.15.2的Java开发-入门
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-入门相关的知识,希望对你有一定的参考价值。
下载后进入bin目录,如果是单机直接运行:
./start-cluster
运行后打开IE,输入http://localhost:8081
得到以下界面:
书写第一个例子,需求:读入一个.txt文件,按照空格统计每个词的重复次数,然后输出到一个output文件夹。
JDK:OpenJDK11(必须使用此JDK)
Flink:1.15.2
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aldi.flink.demo</groupId>
<artifactId>SinkToRedis</artifactId>
<version>0.0.1</version>
<properties>
<jcuda.version>10.0.0</jcuda.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<curator-framework.version>4.0.1</curator-framework.version>
<curator-recipes.version>2.8.0</curator-recipes.version>
<guava.version>27.0.1-jre</guava.version>
<fastjson.version>1.2.59</fastjson.version>
<jackson-databind.version>2.11.1</jackson-databind.version>
<hutool-crypto.version>5.0.0</hutool-crypto.version>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<compiler.plugin.version>3.8.1</compiler.plugin.version>
<war.plugin.version>3.2.3</war.plugin.version>
<jar.plugin.version>3.1.1</jar.plugin.version>
<poi.version>4.1.0</poi.version>
<poi-ooxml.version>4.1.0</poi-ooxml.version>
<poi-ooxml-schemas.version>4.1.0</poi-ooxml-schemas.version>
<dom4j.version>1.6.1</dom4j.version>
<log4j2.version>2.17.1</log4j2.version>
<commons-lang3.version>3.4</commons-lang3.version>
<common-util.version>0.0.1</common-util.version>
<flink.version>1.15.2</flink.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>$flink.version</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>$flink.version</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aldi.flink.demo.SinkToRedis</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
WordCount.java
/**
* 系统项目名称 com.aldi.flink.demo.hello WordCount.java
*
* 2022年9月22日-上午11:28:33 2022XX公司-版权所有
*
*/
package com.aldi.flink.demo.hello;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
/**
*
* WordCount
*
*
* 2022年9月22日 上午11:28:33
*
* @version 1.0.0
*
*/
public class WordCount
/**
* main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
*
* @param args
* void
* @exception
* @since 1.0.0
*/
public static void main(String[] args) throws Exception
// 1、创建执行环境
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、读取数据
String path = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/words.txt";
String outputPath = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/output";
DataStream<String> text;
// DataSet -> Operator -> DataSource
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(path));
text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
DataStream<Tuple2<String, Integer>> counts =
// The text lines read from the source are split into words
// using a user-defined function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
text.flatMap(new Tokenizer()).name("tokenizer")
// keyBy groups tuples based on the "0" field, the word.
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
// This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
// For each key, we perform a simple sum of the "1" field, the count.
// If the input data stream is bounded, sum will output a final count for
// each word. If it is unbounded, it will continuously output updates
// each time it sees a new instance of each word in the stream.
.sum(1).name("counter");
counts.sinkTo(FileSink.<Tuple2<String, Integer>>forRowFormat(new Path(outputPath), new SimpleStringEncoder<>())
.withRollingPolicy(DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes(10))
.withRolloverInterval(Duration.ofSeconds(10)).build())
.build()).name("file-sink");
env.execute("WordCount");
System.out.println("done");
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
// normalize and split the line
String[] tokens = value.toLowerCase().split(" ");
// emit the pairs
for (String token : tokens)
if (token.length() > 0)
out.collect(new Tuple2<>(token, 1));
然后使用mavan打包,此处我们使用的是flink推荐的maven-shard-plugin打包:
mvn clean install
得到这样的一个包
进入flink的web ui界面
使用【Add New】按钮上传这个包。
由于是单机Parallelism里填1或者不填(默认为集群数),如果是3机集群可以填3(>3倍计算效率)。
然后你可以看到这个job已经运行完毕了。
我们在我们的output目录得到以下这些输出,打开这些输出就可以看到这些统计了。
也可以sink(输出)到hdfs上去。
完成第一课。
以上是关于FLINK 基于1.15.2的Java开发-入门的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Source端
FLINK 基于1.15.2的Java开发-在flink内如何使用log4j
FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境
FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜