MacOS下安装Apache Flink及测试WordCount
Posted 江南独孤客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MacOS下安装Apache Flink及测试WordCount相关的知识,希望对你有一定的参考价值。
1.安装java1.8版本
steven@wangyuxiangdeMacBook-Pro ~ java -version
java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
2.安装flink
使用brew安装flink,命令如下:
brew install apache-flink
3.查看是否安装成功
steven@wangyuxiangdeMacBook-Pro ~ flink -v
Version: 1.13.2, Commit ID: 5f007ff
4.查看flink安装目录
steven@wangyuxiangdeMacBook-Pro ~ brew info apache-flink
apache-flink: stable 1.13.2 (bottled), HEAD
Scalable batch and stream data processing
https://flink.apache.org/
/usr/local/Cellar/apache-flink/1.13.2 (164 files, 325.3MB) *
Poured from bottle on 2022-05-13 at 15:52:56
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-flink.rb
License: Apache-2.0
==> Dependencies
Required: openjdk@11 ✔
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 449 (30 days), 1,388 (90 days), 6,005 (365 days)
install-on-request: 451 (30 days), 1,392 (90 days), 5,997 (365 days)
build-error: 0 (30 days)
5.进入flink安装目录,启动flink
cd /usr/local/Cellar/apache-flink/1.13.2/
./libexec/bin/start-cluster.sh
steven@wangyuxiangdeMacBook-Pro /usr/local/Cellar/apache-flink/1.13.2 ./libexec/bin/start-cluster.sh
\\Starting cluster.
Starting standalonesession daemon on host wangyuxiangdeMacBook-Pro.local.
Starting taskexecutor daemon on host wangyuxiangdeMacBook-Pro.local.
6.进入web页面,可以看到启动成功:http://localhost:8081/
7.关闭集群的命令
cd /usr/local/Cellar/apache-flink/1.13.2/
./libexec/bin/stop-cluster.sh
8.用java写实时流的flink任务,代码如下:
package com.dangbei.flink_test.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
public class Test_WordCount
public static void main(String[] args) throws Exception
// 创建Flink的代码执行实时流处理上下文环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义读取数据机器主机名称和端口
String host = "localhost";
int port = 9000;
// 获取输入对应的socket输入的实时流数据
DataStream<String> inputLineDataStream = env.socketTextStream(host, port);
// 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
DataStream<Tuple2<String, Integer>> resultStream = inputLineDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)throws Exception
// 按空白符号分词
String[] wordArray = line.split("\\\\s");
// 遍历所有word,包成二元组输出
for (String word : wordArray)
out.collect(new Tuple2<String, Integer>(
word, 1));
).keyBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group
// by而是用keyBy来代替
.sum(1); // 将第二个位置上的freq=1的数据求和
// 打印出来计算出来的(word,freq)的统计结果对
resultStream.print();
// 正式启动实时流处理引擎
env.execute();
8.1pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dangbei</groupId>
<artifactId>flink_test</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<hadoop.version>3.0.0</hadoop.version>
<flink.shaded.version>9.0</flink.shaded.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</Flink的安装部署及WordCount测试