在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平相关的知识,希望对你有一定的参考价值。
在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平
背景
我们的离线数仓项目,在中台有个Hive维度表,由于SQL Boy们的历史局限性,导致了该定时HQL任务运行后概率性出现主键重复的问题。当然主键重复了,后续的HQL任务们依然能够照常从该维度表取数及进行各种性能极烂的Join操作,跑出很多错误的数据。由于种种不便透露的原因,这种主键重复的问题并不能通过简单的修改HQL任务运算逻辑来解决,只能人工检查是否有主键重复的现象→手动查出重复数据→手动确认正确的一条数据→手动处理该维度表。由于是纯手动的,就需要专人值守夜班周末班,如果当天的主键没有重复就可以睡个安稳觉,否则还得蹲点到HQL任务刚开始运行就手动kill。这种纯手打方式,显然对运维监控人员极不友好。
SQL Boy们想要用纯SQL的方式实现主键重复就自动失败的功能,无果,夜班周末班好一阵子后最终还是屈服了。于是笔者使用PySpark实现了一个主键重复就自动失败的任务,将不堪其扰的SQL Boy们解放出来。
需求
根据维度表的取数逻辑自动检测主键是否重复,如果主键重复就自动失败,等上班时间由专人去手动处理该任务并手动吊起下游任务。如果主键没有重复就正常退出。
实现原理
当租用的阿里云DataPhin中台及依赖的CDP环境配置完善后,可以使用Spark On Hive的sparkSQL,为了方便SQL Boy们修改取数逻辑,我们用Python比Java和Scala要多一些【性能一塌糊涂,唯一的好处是不需要编译当然也就方便修改SQL】。基于PySPark可以操作算子读Hive表以获取DataFrame及做常规的计算。只需要根据SQL Boy们提供的取数HQL就可以拿到维度表的数据,之后单独拎出主键count出数据量,再与distinct去重后的数据量做比较。如果数据量不一致【毫无疑问只能是去重后的数据量更少】,就想办法让任务跑失败,否则成功。
脱敏后的PySpark代码
#! /bin/bash
# 笔者习惯这里写一点注释
echo "START"
cat zhiyong_pre_check_full_daily.py <<EOF
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark.sql import SparkSession
spark=SparkSession \\
.builder \\
.appName("zhiyong_pre_check_full_daily") \\
.config("hive.exec.dynamic.partition","true") \\
.config("hive.exec.dynamic.partition.mode","nonstrict") \\
.getOrCreate()
sql1="""
select 主键 from(
这里写SQL Boy们提供的SQL
) temp
"""
df1=spark.sql(sql1)
df1.show()
count1=df1.count()
df2=df1.distinct()
df2.show()
count2=df2.count()
res=1
if count1==count2:
res=1/1
else:
res=0/0
EOF
echo "END"
spark-submit --master yarn --deploy-mode client --driver-memory 4G --executor-memory 4G --executor-cores 1 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutor=10 --conf spark.dynamicAllocation.maxExecutor=50 --conf spark.memory.fraction=0.95 --conf spark.shuffle.service.enabled=true --conf spark.ui.port=4180 --conf spark.port.maxRetries=128 --conf spark.rpc.timeout=600s --conf spark.debug.maxToStringFields=4096 --conf spark.sql.crossJoin.enabled=true --conf spark.sql.broadcastTimeout=600s --conf spark.sql.autoBroadcastJoinThreshold=-1 zhiyong_pre_check_full_daily.py
让任务失败,笔者的做法简单粗暴:使用0作为分母。
总结
事实上,这么做只是实现了半自动。按照SQL Boy们只会SQL并妄想把所有需求都用纯SQL方式实现的迂腐思想,离真正意义上的自动化运维还有相当长的一段路要走。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127485287
以上是关于在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平的主要内容,如果未能解决你的问题,请参考以下文章
Dataphin核心功能:安全——基于数据权限分类分级和敏感数据保护,保障企业数据安全
PySpark。如何确保每日增量数据在 HIVE 中没有重复的 UUID 作为 PK
基于pyspark中仅一列的两个DataFrame之间的差异[重复]