04 flink 集群搭建
Posted 蓝风9
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04 flink 集群搭建相关的知识,希望对你有一定的参考价值。
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount
public static void main(String[] args) throws Exception
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>()
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
String[] splited = s.split("\\\\s+");
for(String str : splited)
collector.collect(str);
)
.map(new MapFunction<String, Tuple2<String, Integer>>()
@Override
public Tuple2<String, Integer> map(String s) throws Exception
return new Tuple2<>(s, 1);
)
.keyBy(new KeySelector<Tuple2<String, Integer>, String>()
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception
return stringIntegerTuple2.f0;
)
.sum(1)
.addSink(new MySink());
env.execute();
// MySensorSource
class MySource implements SourceFunction<String>
private boolean running = true;
public void cancel()
running = false;
public void run(SourceFunction.SourceContext<String> sourceContext)
while (running)
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try
Thread.sleep(5000);
catch (Exception e)
e.printStackTrace();
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>>
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context)
System.out.println(value.f0 + " - " + value.f1);
通过 flink webui 上传 driver program, 然后 可以直接 submitJob
提交 job 之后, job 的监控如下
driver program 的标准输出如下
完
以上是关于04 flink 集群搭建的主要内容,如果未能解决你的问题,请参考以下文章