唯品会SPARK3.0升级之路

Posted 唯技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了唯品会SPARK3.0升级之路相关的知识,希望对你有一定的参考价值。

导读

唯品会离线平台SPARK2.3.2无缝升级到SPARK3.0.1版本,完全做到了对用户透明,目前正按着既定方案进行升级,新的版本SPARK CORE/SQL/PySpark进行了优化和BugFix,并且Merge了SPARK vip 2.3.2 重要Patch,在性能和易用性上比旧版本都有较大提升。这篇文章介绍了我们升级SPARK过程中遇到的挑战和思考,希望能给大家带来启发。 


Spark应用现状

本次版本升级之前,唯品会大数据平台使用的主要版本为SPARK2.3.2,并且在社区版本上做了增强和BugFix,为用户提供SparkSQL/SparkJar/PySpark/SparkStream/Spark ML。现在集群有2300物理机,每天有1.2w SPARK 定时作业,8w个实例 在YARN上运行,Spark Adhoc日查询次数9000。


SPARK在公司推荐使用效果不错,作为批处理默认引擎。从下面图反映SPARK在唯品会大数据应用有半壁江山。

图:离线平台SPARK/HIVE个数对比   

唯品会SPARK3.0升级之路

图:离线平台SPARK作业个数对比  

唯品会SPARK3.0升级之路

图:离线平台SPARK/PRESTO/HIVE ADHOC查询次数对比   

唯品会SPARK3.0升级之路

图:AI平台各组件应用个数对比

SPARK3.0特性和升级背景 

Spark SQL是3.0.1中最侧重组件。已解决ISSUE 46%用于Spark SQL。使基于SQL高级别的库(包括结构化流和MLlib)和更高级别的API包括SQL和DataFrames)受益。这也迎合我们现在的主要场景(我们现在90%是SQL),同时也是优化痛点和主要功能点。


PySpark现在是Spark非常活跃的模块。此版本改善了功能和可用性,包括使用Python类型提示重新设计了Pandas UDF API,新的Pandas UDF类型以及更多的Python错误处理。这为我们新的战场比如推荐,特征工程,下一代AI平台等项目提供更好支持。


Spark 3.0中的功能亮点:自适应查询执行;动态分区修剪;符合ANSI SQL;Pandas API的重大改进;用于结构化流的新UI;加速器感知调度器;增强测试和SQL参考文档。达到了增效降本的目的。

唯品会SPARK3.0升级之路

 图:3.0.1特性ISSUE模块分布 

唯品会SPARK3.0升级之路

图:Spark2.3.2的缺点

#
目标
现状
痛点
解决方案(3.0特性支持)
背景1

智能优化

性能提升

▲自动合并小文件

仅仅自动调整shuffle partition数

主要基于规则优化

优化靠资源堆砌

主要人工分析: 1.倾斜分析靠人工和加盐处理;2.Broadcast Join靠经验判断;3.读写和混洗并发靠人为分析指定

手工调参

被动优化

只能基于规则优化

靠资源堆砌

集群资源有限

有些场景加了资源效果不佳(倾斜)

自适应查询执行:1.shuffle partition自动调整;2.动态查询重用;3.本地shuffle读;4.join倾斜自动优化;5.连续读shuffle优化 

动态分区优化

其他规则优化

最小化表缓存同步成本

将聚合代码拆分为小函数

在INSERT和ALTER TABLE Add PARTITION中添加批处理

允许聚合器注册为UDAF

背景2 基础依赖升级

目前线上HIVE为2.3.4

SPARK2.x依赖HIVE为1.2.1

spark2.x依赖Hadoop2.6

Hadoop线上为3.2

HIVE和SPARK函数和语法兼容性问题

线上场景多,差异突显就更多

Spark2.x升级Hadoop3.x费时费力

Spark2.x和Hadoop有第三方依赖冲突

SPARK3.0依赖2.3.7 HIVE

升级后线上SPARK依赖HIVE和HIVE基准都为2.3

Hadoop 3 support

Java 11 support 

GA Scala 2.12 and remove 2.11

背景3 数据湖

Spark2.x通过API操作delta lake 

实时增量入仓(替换离线抽取装载)

缺失SQL on  delta lake

易用性差

Merge性能低

数据湖SQL支持

SPARK3.0性能加强给delta lake带来性能提升

背景4 数据源增强

Orc默认存储格式

SPARK2.X支持Orc/Parquet/sequence/text/csv/jdbc/json

列裁剪不确定表达式失效

嵌套子字段下推失效

CSV下推失效

parquet/ ORC:  1.析取谓词的下推;2.通用化嵌套列修剪;3.嵌套字段的parquet谓词下推(仅限parquet);4.支持ORC的合并模式(仅限ORC)5.;ORC的嵌套模式修剪(仅限ORC);6.减少ORC的谓词转换复杂度(仅限ORC)

Kafka:  1.增加对Kafka报头的支持;2.在Kafka源代码中引入新的选项:3.时间戳偏移(开始/结束);4.支持Kafka批处理源和流处理源v1中的minPartitions选项;5.升级Kafka到2.4.1

背景5
pyspark增强

PySpark2.x UDF多接口实现

PySpark2.x错误提示词不达意

pyspark应用越来越广但分析维护难

重新设计的带有类型提示的pandas udf

允许Pandas UDF接受pd.DataFrames的迭代器

支持StructType作为标量pandas UDF的参数和返回类型

通过Pandas udf支持dataframe Cogroup

添加mapInPandas来允许数据帧的迭代器

使PySpark SQL异常更加python化

背景6
spark on cloud

Stream on k8s

ML on k8s

Spark on k8s功能不完善

driver和executor的日志收集困难

缺乏Spark SQL on k8s

缺乏STS on k8s

支持用户指定的driver和executor pod模板

允许没有外部shuffle服务的动态分配

使用k8s进行响应更灵敏的动态分配

与Hadoop兼容的文件系统支持客户端依赖关系

支持Kubernetes的子路径安装

背景7

功能增强

只支持broadcast join Hints

只支持API REPARTITIONS Hints by column

Catalyst只支持SELECT/INSERT

SQL支持Hints不全

Catalyst不支持DML

已有些函数性能差

引入完整Hints join语法

SQL支持REPARTITION BY hint 

Thrift Server中元数据处理

在Catalyst支持删除/更新/合并操作符

新的35个内建函数

对现有内置函数的改进

背景8
监控/调试增强

SHS日志大且杂乱

SHS加载日志性能差

metric信息不全

UI信息不多

metric信息不够

分析问题困难

测试手段不多

新的结构化流UI

SHS:允许滚动流应用程序的事件日志

SQL exchange操作符添加shuffle度量

提高历史服务器的并发性能

解释格式化命令

改进SQL解析器的错误消息

向度量系统添加执行器度量和内存使用测量

升级Vip SPARK3.0

主要升级节点(如下图)

唯品会SPARK3.0升级之路

合并HIVE VIP1.2.1 patch到2.3.7

#
Patch描述
1 修复Spark3.0能删非默认hdfs namespace表的目录
2 Spark3.0加载vip数据平台函数
3 屏蔽Hive audit log日志
4 解决函数冲突问题

合并SPARK VIP2.x patch到3.0.1

#
Patch描述
1
小文件合并
2 shuffle文件准实时清理,防止nodemanager磁盘压力大
3 Thrift server 删临时目录 bugfix
4 Thrift server支持代理用户
5 支持Hive权限检验
6 Thrift server 超时杀任务
7 支持bucket 超集
8
支持Spark bucket(datasource)读写同一目录
9

支持tf/es/kudu/alluxio/delta到2.1

灰度上线spark3.0和bugfix

线上作业灰度计划

*注解p0/p1/p2/p3 为作业重要度级别,以此递减(下图为灰度作业流程)


bugfix和兼容线上2.3.2

灰度线上作业发现BUG不少,但BUG分优先级,优先级高先处理。

灰度线上作业和不断bugfix是一个迭代过程,不断完善和修复因为SPARK3.0上线导致线上问题。

# bug/兼容性问题
解决方案
1

分号在 -- 注释后面bug,导致SQL执行失败

在spark sql语句 每个分号加上换行符
2

数据存储时,不对数据类型和列类型进行校验

spark.sql.storeAssignmentPolicy默认值改为LEGACY

3

stage 和 推测task 映射关系不存在报错bug

先判断映射关系存在,再从map去掉结束task和stage映射关系
4

java.text.SimpleDataFormat用于以区域设置敏感的方式格式化和解析日期/时间戳,默认值是EXCEPTION,当我们得到不同的结果时抛出RuntimeException

set spark.sql.legacy.timeParserPolicy默认值改为LEGACY

5 执行SQL文件,中间某段SQL报错,不会马上退出整个执行 SQL顺序执行, 中间有结果状态不为0,直接将状态返回给driver直接退出
6 stage retry多个active taskSetManager在运行 stage retry,设置历史taskSetManager为Zombie状态
7

静态插入 UI,显示动态分区个数不为0 ,只影响ui显示,不影响执行结果


8 Spark ui 作业结束,但是 history sql 还有显示running job,不影响执行结果

.挑战和应对

内部 patch 如何兼容

挑战:Spark2.x 自研功能比如: 小文件合并;支持权限检验;STS 超时杀任务;Shuffle 文件动态清理等都是是基于2.x打的patch,但是3.0相比2.x整个核心变化比较大, merge到3.0难度增大,测试和灰度工作量增加。

应对:有些patch基于3.0是重新开发一次。


SQL 语法兼容 

挑战:历史包袱, 线上用户SQL掌握熟练程度不一样,不规范的SQL大有存在,2.x对语法没有那么强制,但3.0默认相对2.x严格一点,这样对灰度线上作业阻力巨大,对保障SLA有冲击。比如: 表不存在删表;数据插入数据列类型和元数据字段是否检验;敏感日期格式化;SQL文件设置非SQL参数报错等。

应对:大部分是兼容2.x版本,少数修改作业兼容3.0.


新版本BUG不少

挑战:灰度线上作业出现十几个bug,比如: 注释后面分号无法识别;stage和推测task对应关系找不到;执行SQL文件,中间某段SQL报错,不会立刻退出整个执行,导致数据质量问题;metric 在application结束之后没有complete等。

应对:及时快速Bugfix。


线上作业灰度工作量大

挑战:线上作业类型(批处理、流处理、机器学习、adhoc、pyspark和sparkjar)多,作业量大(线上有1.2w个作业),重要业务,核心全链路必须保障SLA 等情况下, 灰度作业压力不小。而且出现新的bug, 压力就倍增。

应对:必须停止灰度,快速BugFix再灰度。

总结

平滑升级,SPARK组件每次升级都需要做到尽量对用户毫无感知(后台修改spark_version切换),升级,灰度,回滚,监控都是平台自动统一处理;

升级充分准备,对新版本的憧憬和敬畏都不可或缺,升级之前要对社区新版本的feature和BUG要有足够的认知,甚至对升级版本和最新版本之间的issue也要有足够的了解,因为后面版本issue很大一部分是对前面的版本进行优化和bugfix,这样才能做到心中有数,比如3.1已经修复不少3.0bug;


升级策略,社区重大版本一般与线上版本核心架构差异很大,比如3.0加入自适应导致spark sql模块和shuffle service服务发生重大变化,直接在2.3.2基础上 merge 3.0难度较大,风险也高,建议3.x基础之上merge 2.3.2的patch进行升级;


真理来源于实践,从2.1.1升级到2.3.2为本次2.3.2升级3.0积累不少的经验和套路,为本次升级少走弯路和保障SLA;


吸取社区,本次升级完3.0不是终点,Spark社区非常活跃,3.1已经出来了,中间有很多非常好的issue, 都需要merge vip SPARK 3.0, SPARK优化永无止境;


回馈社区,近期打算把VIP SPARK main Patch 比如: 小文件合并;支持权限检验;STS 超时杀任务;Shuffle 文件动态清理 等提交社区。


欢迎投稿!!

只要是技术相关的文章尽管砸过来!


以上是关于唯品会SPARK3.0升级之路的主要内容,如果未能解决你的问题,请参考以下文章

唯品会的Service Mesh三年进化史

唯品会能用微信支付吗

网页版唯品会怎么收藏单件商品

在唯品会上的未支付订单最后是怎么处理的

唯品会HDFS性能挑战和优化实践

雄起的特卖电商——唯品会