Kafka积压造成的数据合并错误解决方式

Posted 不会Hive的啊扬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka积压造成的数据合并错误解决方式相关的知识,希望对你有一定的参考价值。

目的:为了解决kafka数据积压而造成的影响数据合并准确性问题

方式:用check1和check2两个脚本,check1只启动一个,做判断并给值,如果当前表有今天的ts说明可以合并昨天数据,则标志位设置为1,否则设置为0,如果满足条件的表到一定比例就可以大致确定没有积压,数据合并是没有问题的,则填入一个表名为all的表,此时所有表都可参与合并工作。check2有多少个ods表就启动多少个,查询check1所输出的表,判断是否执行ods合并,或等到一个表名为all的表,强制其开始合并。

  1. 建表语句

CREATE EXTERNAL TABLE tmp.check1 (
     tablename STRING,
     check1 INT,
     ctime STRING) 
PARTITIONED BY (dt STRING) 
STORED AS PARQUET;

说明:

每天一个分区,check1记录当天合并前每张表的最终状态值(是否有当天凌晨ts),为1或0,ctime记录合并前最后一次赋值入表时的时间

  1. check1

#!/bin/bash
 
value=0.8
time=03
 
while ((1))
do
        hive -e "
        set mapreduce.map.memory.mb=512;
        set mapreduce.reduce.memory.mb=10240;
        set mapred.max.split.size=300000000;
        set mapred.min.split.size.per.node=300000000;
        set mapred.min.split.size.per.rack=300000000;
        set mapred.reduce.tasks=1;
        set hive.exec.dynamic.partition.mode=nonstrict;
        insert overwrite table tmp.check1 partition(dt) 
        select \\`table\\`, if(from_unixtime(bigts, 'yyyy-MM-dd')=dt, 1, 0), from_unixtime(unix_timestamp()), current_date() dt
        from
        (
        select \\`table\\`, dt, max(cast(get_json_object(content,'$.ts') as bigint)) bigts
        from ods_binlog.ods_binlog where dt = current_date() group by \\`table\\`,dt
        )t
        "
 
        QUAN=$(hive -S -e "
        select count(if(check1=1, 1, null))/count(*) from tmp.check1
        ")
 
        if [[ `echo "$QUAN > $value" | bc` -eq 1 || $(date "+%Y-%m-%d-%H") = $(date "+%Y-%m-%d")-$time ]]
        then
                hive -e"
                set mapreduce.map.memory.mb=512;
                set hive.exec.dynamic.partition.mode=nonstrict;
                insert into table tmp.check1 partition(dt) 
                select 'ALL', 1, from_unixtime(unix_timestamp()), current_date() dt
                "
                exit
        fi
 
        sleep 1m
done

说明:

1.value为全部开始合并的比例阈值,例如有十个表,九个表已经有ts为今天Binlog,则比例为0.9,若阈值为0.8,则剩余的一个表也开始合并,

该参数用来避免有些表变化小一直没有今天Binlog而陷入死等状态

2.time为等待的最大忍受限度,例如有十个表,七个表已经有ts为今天Binlog,则比例为0.7,达不到阈值0.8,假如time为凌晨三点,只要过了这个点,剩余的三个表也开始合并,

该参数用来避免哪天突发情况,阈值设置不合适导致的死等状态,是value的补充

补充:

1.QUAN=$(hive -S -e "sql语句")可以将hive-e产生的结果集返回给变量QUAN

2.table为分区名,但也为hive中的关键字,需要使用飘号引起来,但因为飘号也为shell中包起来命令的关键字,为避免冲突,飘号前使用反斜杠\\`table\\`

3.group by分组后,select中只能是分组字段或聚合函数,如果想要对max值在判断,应在外边嵌套一层

4.浮点数比较大小应该先通过管道符传给bc并用飘号引起来`echo "$QUAN > $value" | bc`,返回结果为真则返回1,否则返回0,再和1做等值整型eq判断即可

5.shell中日期函数为date后面可接格式化类型date "+%Y-%m-%d-%H",加号写在外面效果一样,此时返回格式为年-月-日-时用$()引起来后可做变量的等值判断,后面可拼接其他字符串

6.如果if条件里有且或关系等,应该用[[ ]]双层中括号,while ((1))为死循环,sleep 1m为睡眠一分钟,每隔1m重新写一次分区表

  1. check2

#!/bin/bash
 
tablename=$1
 
while ((1))
do
        QUAN=$(hive -S -e "
        select check1 from tmp.check1 where (tablename = 'ods_binlog_$tablename_di' or tablename = 'ALL') and dt = current_date()        ")
 
        for check in $QUAN[*]
        do
                if [ $check -eq 1 ]
                then
                        echo "退出"
                        exit
                fi
        done
        echo "下次循环"
        sleep 1m
done

说明:

tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断

补充:

tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断

补充:

1.因为我们的输出是双行单列的,故不牵扯取第几列的问题,如果多行多列又要取第二列,则QUAN=$(hive -S -e "sql语句" | cut -d' ' -f 2),意为每行都按制表符切分,取第二列,需要注意不能用\\t,

由于sql输出的结果集不同列之间是制表符,而默认cut就是按照制表符切分的,所以此时-d参数可以省略

2.对于变量接受到结果集的遍历,应当使用for check in$QUAN[*],类似于java的增强for循环,为什么要遍历所有,因为该表可能返回标志位为0,但ALL作为保底可能此时返回的是1,此时就会有0和1两个结果,我们不能盲目的只取第一个结果

[*]代表依次遍历结果集中所有行,[2]代表只遍历下标为2的行,不写方括号默认下标为0的行,forcheck in QUAN则会传入值"QUAN"给check

以上是关于Kafka积压造成的数据合并错误解决方式的主要内容,如果未能解决你的问题,请参考以下文章

Flink 消费Kafka每日不定时积压(非重启不能解决)问题排查解决

2021年大数据Kafka:❤️Kafka的消费者负载均衡机制和数据积压问题❤️

2021年大数据Kafka:❤️Kafka的消费者负载均衡机制和数据积压问题❤️

spark改七行源码实现高效处理kafka数据积压

Kafka 消费者之消费方式工作流程消费者案例(订阅主题订阅分区)消费者组案例分区的分配以及再平衡offset 位移消费者事务数据积压(消费者如何提高吞吐量)

kafka专栏消费者组数据积压的查看与处理方法