flink伪分布式搭建及其本地idea测flink连接

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink伪分布式搭建及其本地idea测flink连接相关的知识,希望对你有一定的参考价值。

下载安装flink:
上传压缩包:flink-1.7.2-bin-scala_2.12.tgz
解压:tar -zxvf /flink-1.7.2-bin-scala_2.12.tgz -C ../hone
复制解压文件到子节点:
scp -r /home/flink-1.7.2/ root@slave1:/home/
scp -r /home/flink-1.7.2/ root@slave2:/home/
修改配置文件:选择一个master节点,配置conf/flink-conf.yaml
vi conf/flink-conf.yaml
设置jobmanager.rpc.address 配置项为该节点的IP 或者主机名
jobmanager.rpc.address: 10.108.4.202
然后添加子节点配置:
在所有的节点中:flink目录下:vi conf/slaves
添加所有子节点ip然后保存
启动本地的flink集群:
cd 到flink目录下
./bin/start-cluster.sh
查看webui:ip:8081
启动监听:nc -lk 9000
当报nc命令不存在时(yum install nc)
然后执行测试jar:
停止flink集群:bin/stop-cluster.sh
以集群方式提交任务:在flink目录下
./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000

新建maven程序
pom.xml依赖如下:
然后新建一个TestSocketWindowWordCount类具体代码如下
然后启动flink集群->新建一个监听:nc -lk 6666
然后启动TestSocketWindowWordCount类
在linux监听页面输入代码
观察在idea控制台就有统计的输出
-------pom.xml开始-----------
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
-------pom.xml结束-----------
-------TestSocketWindowWordCount开始------------------
package com.gyb;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.xml.soap.Text;

public class TestSocketWindowWordCount
public static void main(String args[])
String hostname = "192.168.198.130";
int port = 6666;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.socketTextStream(hostname, port, "\n");//获取执行环境
SingleOutputStreamOperator windowCounts = text
.flatMap(new FlatMapFunction<String, SocketWindowWordCount.WordWithCount>() br/>@Override
public void flatMap(String value, Collector<SocketWindowWordCount.WordWithCount> out)
for (String word : value.split("\s"))
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));


)
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(5))
.reduce(new ReduceFunction<SocketWindowWordCount.WordWithCount>()
br/>@Override
public SocketWindowWordCount.WordWithCount reduce(SocketWindowWordCount.WordWithCount a, SocketWindowWordCount.WordWithCount b)
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);

);

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
//env.execute("Socket Window WordCount");
try 
    env.execute("Socket Window WordCount");
 catch (Exception e) 
    e.printStackTrace();

public static class WordWithCount 

    public String word;
    public long count;

    public WordWithCount() 

    public WordWithCount(String word, long count) 
        this.word = word;
        this.count = count;
    

    @Override
    public String toString() 
        return word + " : " + count;
    


-------TestSocketWindowWordCount结束------------------

以上是关于flink伪分布式搭建及其本地idea测flink连接的主要内容,如果未能解决你的问题,请参考以下文章

Flink伪分布式环境搭建及应用,Standlong(开发测试)

Flink1.8 集群搭建完全指南(1):Hadoop伪分布式

Flink的安装和部署--伪分布模式

Flink的安装和部署--伪分布模式

Flink 实时数仓伪分布虚拟机 (所有组件部署完成)

flink开发环境搭建maven环境搭建IDEA环境搭建