25.Flink监控什么是MetricsMetrics分类Flink性能优化的方法合理调整并行度合理调整并行度Flink内存管理Spark VS Flink时间机制容错机制等

Posted to.to

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了25.Flink监控什么是MetricsMetrics分类Flink性能优化的方法合理调整并行度合理调整并行度Flink内存管理Spark VS Flink时间机制容错机制等相关的知识,希望对你有一定的参考价值。

25.Flink监控
25.1.什么是Metrics
25.2.Metrics分类
25.2.1.Metric Types
25.2.2.代码
25.2.3.操作
26.Flink性能优化
26.1.复用对象
26.2.数据倾斜
26.3.异步IO
26.4.合理调整并行度
27.Flink内存管理
28.Spark VS Flink
28.1.应用场景
28.2.API
28.3.核心角色/流程原理
28.3.1.spark
28.3.2.Flink
28.4.时间机制
28.5.容错机制
28.6.窗口
28.7.整合Kafka
28.8.其他的
28.9.单独补充:流式计算实现原理
28.10.单独补充:背压/反压
28.10.1.back pressure

25.Flink监控

https://blog.lovedata.net/8156c1e1.html

25.1.什么是Metrics

由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的Task日志,比如作业很大或者有很多作业的情况下,该如何处理?此时Metrics可以很好的帮助开发人员了解作业的当前状态。

Flink提供的Metrics可以在Flink内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。

25.2.Metrics分类

25.2.1.Metric Types

1.常用的如Counter, 写过mapreduce作业的开发人员就应该很熟悉Counter, 其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
2.Gauga,Gauge是最简单的Metrics,它反映一个值。比如要看现在Java heap内存用了多少,就可以每次实时的暴露一个Gauge, Gauge当前的值就是heap使用的量。
3.Meter, Meter是指统计吞吐量和单位时间内发生”事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。
4.Histogram,Histogram比较复杂,也并不常用,Histogram用于统计一些数据的分布,比如说Quantile、Mean、StdDev、Max、Min等。

Metric在Flink内部有多层结构,以Group的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name是Metrics的唯一标识。

25.2.2.代码

package day6;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author tuzuoquan
 * @date 2022/6/22 9:26
 */
public class MetricsDemo 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("node1", 9999);

        //TODO 2.transformation
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() 
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception 
                String[] arr = value.split(" ");
                for (String word : arr) 
                    out.collect(word);
                
            
        );

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words
                .map(new RichMapFunction<String, Tuple2<String, Integer>>() 
                    //用来记录map处理了多少个单词
                    Counter myCounter;

                    //对Counter进行初始化
                    @Override
                    public void open(Configuration parameters) throws Exception 
                        myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
                    
                    //处理单词,将单词记为(单词,1)
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception 
                        myCounter.inc();//计数器+1
                        return Tuple2.of(value, 1);
                    
                );

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);

        //TODO 3.sink
        result.print();

        //TODO 4.execute
        env.execute();
    


运行命令:

// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.xxx.metrics.MetricsDemo /root/metrics.jar
// 查看WebUI

25.2.3.操作

1.打包
2.提交到Yarn上运行
3.查看监控指标

4.也可以通过浏览器f12的找到url发送请求获取监控信息

5.也可以通过代码发送请求获取监控信息

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;

public class MetricsTest 
    public static void main(String[] args) 
        //String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.Map.myGroup.myCounter");
        String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5");

        System.out.println(result);
    

    public static String sendGet(String url) 
        String result = "";
        BufferedReader in = null;
        try 
            String urlNameString = url;
            URL realUrl = new URL(urlNameString);
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();
            in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) 
                result += line;
            
         catch (Exception e) 
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        
        // 使用finally块来关闭输入流
        finally 
            try 
                if (in != null) 
                    in.close();
                
             catch (Exception e2) 
                e2.printStackTrace();
            
        
        return result;
    


26.Flink性能优化

26.1.复用对象

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() 
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception 
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        
    

上面的代码可以优化为下面的代码:

可以避免Tuple2的重复创建

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() 
    // Create an instance that we will reuse on every call
    private Tuple2<String, Long> result = new Tuple<>();
    @Override
    public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception 
        long changesCount = ...
        // Set fields on an existing object instead of creating a new one
        result.f0 = userName;
        // Auto-boxing!! A new Long value may be created
        result.f1 = changesCount;
        // Reuse the same Tuple2 object
        collector.collect(result);
    

26.2.数据倾斜

rebalance

自定义分区器

key+随机前后缀

26.3.异步IO

26.4.合理调整并行度

数据过滤之后可以减少并行度

数据合并之后再处理之前可以增加并行度

大量小文件写入到HDFS可以减少并行度
1.ds.writeAsText(“data/output/result1”).setParallelism(1);
2.env.setParallelism(1);
3.提交任务时webUI或命令行参数 flink run -p 10
4.配置文件flink-conf.yaml parallelism.default: 1


更多的优化在后面的项目中结合业务来讲解

27.Flink内存管理

1.减少full gc时间:因为所有常用数据都在Memory Manager里,这部分内存的生命周期是伴随TaskManager管理的而不会被GC回收。其他的常用数据对象都是用户定义的数据对象,这部分会快速的被GC回收。
2.减少OOM:所有的运行的内存应用都从池化的内存中获取,而且运行时的算法可以在内存不足的时候将数据写到堆外内存。
3.节约空间:由于Flink自定序列化/反序列化的方法,所有的对象都以二进制的形式存储,降低消耗。
4.高效的二进制操作和缓存友好:二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对CPU高效缓存更友好,可以从CPU的L1/L2/L3缓存获取性能的提升,也就是Flink的数据存储二进制格式符合CPU缓存的标准,非常方便被CPU的L1/L2/L3各级别缓存利用,比内存还要快!

28.Spark VS Flink

28.1.应用场景

Spark:主要用作离线批处理 , 对延迟要求不高的实时处理(微批) ,DataFrame和DataSetAPI 也支持 “流批一体”
Flink:主要用作实时处理 ,注意Flink1.12开始支持真正的流批一体

28.2.API

Spark : RDD(不推荐) /DSteam(不推荐)/DataFrame和DataSet
Flink : DataSet(1.12软弃用) 和 DataStream /Table&SQL(快速发展中)

28.3.核心角色/流程原理

28.3.1.spark


28.3.2.Flink





28.4.时间机制

Spark: SparkStreaming只支持处理时间 StructuredStreaming开始支持事件时间
Flink: 直接支持事件时间 / 处理时间 /摄入时间

28.5.容错机制

Spark : 缓存/持久化 +Checkpoint(应用级别) StructuredStreaming中的Checkpoint也开始借鉴Flink使用Chandy-Lamport algorithm分布式快照算法

Flink: State + Checkpoint(Operator级别) + 自动重启策略 + Savepoint

28.6.窗口

Spark中的支持基于时间/数量的滑动/滚动 要求windowDuration和slideDuration必须是batchDuration的倍数

Flink中的窗口机制更加灵活/功能更多

支持基于时间/数量的滑动/滚动 和 会话窗口

28.7.整合Kafka

SparkStreaming整合Kafka: 支持offset自动维护/手动维护 , 支持动态分区检测 无需配置

Flink整合Kafka: 支持offset自动维护/手动维护(一般自动由Checkpoint维护即可) , 支持动态分区检测 需要配置

//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");

28.8.其他的

源码编程语言

Flink的高级功能 : Flink CEP可以实现 实时风控…

28.9.单独补充:流式计算实现原理

Spark :
SparkStreaming: 微批
StructuredStreaming: 微批(连续处理在实验中)

Flink : 是真真正正的流式处理, 只不过对于低延迟和高吞吐做了平衡
早期就确定了后续的方向:基于事件的流式数据处理框架!

env.setBufferTimeout - 默认100ms
taskmanager.memory.segment-size - 默认32KB

28.10.单独补充:背压/反压

28.10.1.back pressure

Spark: PIDRateEsimator ,PID算法实现一个速率评估器(统计DAG调度时间,任务处理时间,数据条数等, 得出一个消息处理最大速率, 进而调整根据offset从kafka消费消息的速率)。

Flink: 基于credit – based 流控机制,在应用层模拟 TCP 的流控机制(上游发送数据给下游之前会先进行通信,告诉下游要发送的blockSize, 那么下游就可以准备相应的buffer来接收, 如果准备ok则返回一个credit凭证,上游收到凭证就发送数据, 如果没有准备ok,则不返回credit,上游等待下一次通信返回credit)

阻塞占比在 web 上划分了三个等级:
OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了(增加并行度/subTask/检查是否有数据倾斜/增加内存…)。

例如,0.01,代表着100次中有一次阻塞在内部调用

以上是关于25.Flink监控什么是MetricsMetrics分类Flink性能优化的方法合理调整并行度合理调整并行度Flink内存管理Spark VS Flink时间机制容错机制等的主要内容,如果未能解决你的问题,请参考以下文章

25.Flink监控什么是MetricsMetrics分类Flink性能优化的方法合理调整并行度合理调整并行度Flink内存管理Spark VS Flink时间机制容错机制等

乐维百科:什么是IT监控?为什么运维需要监控?

乐维百科:什么是IT监控?为什么运维需要监控?

乐维百科:什么是IT监控?为什么运维需要监控?

一天之内你必然能知道什么是监控

一天之内你必然能知道什么是监控