使用spark对hive表中的多列数据判重

Posted 扎心了老铁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用spark对hive表中的多列数据判重相关的知识,希望对你有一定的参考价值。

本文处理的场景如下,hive表中的数据,对其中的多列进行判重deduplicate。

 

1、先解决依赖,spark相关的所有包,pom.xml

spark-hive是我们进行hive表spark处理的关键。

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.19</version>
        </dependency>
    </dependencies>

 

2、spark-client

package com.xiaoju.kangaroo.duplicate;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;

import java.io.Serializable;

public class SparkClient implements Serializable{
    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;

    public SparkClient() {
        initSparkConf();
        javaSparkContext = new JavaSparkContext(sparkConf);
    }


    public SQLContext getSQLContext() {
        return new SQLContext(javaSparkContext);
    }

    public HiveContext getHiveContext() {
        return new HiveContext(javaSparkContext);
    }

    private void initSparkConf() {
        try {
            String warehouseLocation = System.getProperty("user.dir");
            sparkConf = new SparkConf()
                    .setAppName("duplicate")
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster("yarn-client");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

3、判重流程

package com.xiaoju.kangaroo.duplicate;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SparkDuplicate implements Serializable  {

    private transient SparkClient sparkClient;
    private transient HiveContext hiveContext;
    private String db;
    private String tb;
    private String pt;
    private String cols;

    public SparkDuplicate(String db, String tb, String pt, String cols) {
        this.db = db;
        this.tb = tb;
        this.pt = pt;
        this.cols = cols;
        this.sparkClient = new SparkClient();
        this.hiveContext = sparkClient.getHiveContext();
    }

    public void duplicate() {
        String partition = formatPartition(pt);
        String query = String.format("select * from %s.%s where %s", db ,tb, partition);
        System.out.println(query);
        DataFrame rows = hiveContext.sql(query);
        JavaRDD<Row> rdd = rows.toJavaRDD();
        Map<String, Integer> repeatRetMap = rdd.flatMap(new FlatMapFunction<Row, String>() {
            public Iterable<String> call(Row row) throws Exception {
                HashMap<String, Object> rowMap = formatRowMap(row);
                List<String> sList = new ArrayList<String>();
                String[] colList = cols.split(",");
                for (String col : colList) {
                    sList.add(col + "@" + rowMap.get(col));
                }
                return sList;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);

            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).map(new Function<Tuple2<String,Integer>, Map<String, Integer>>() {
            public Map<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                Map<String, Integer> retMap = new HashMap<String, Integer>();
                if (stringIntegerTuple2._2 > 1) {
                    retMap.put(stringIntegerTuple2._1, stringIntegerTuple2._2);
                }
                return retMap;
            }
        }).reduce(new Function2<Map<String, Integer>, Map<String, Integer>, Map<String, Integer>>() {
            public Map<String, Integer> call(Map<String, Integer> stringIntegerMap, Map<String, Integer> stringIntegerMap2) throws Exception {
                stringIntegerMap.putAll(stringIntegerMap2);
                return stringIntegerMap;
            }
        });

        for (Map.Entry<String, Integer> entry : repeatRetMap.entrySet()) {
            if (entry.getValue() > 1) {
                System.out.println("重复值为:" + entry.getKey() + ", 重复个数" + entry.getValue());
            }
        }
    }

    private String formatPartition(String partition) {
        String format = "";
        if (partition.startsWith("pt") || partition.startsWith("dt")) {
            String[] items = partition.split("=");
            for (int i = 0; i < items.length; i++) {
                if (items[i].equals("pt") || items[i].equals("dt")) {
                    format += items[i];
                } else {
                    format += "=‘" + items[i] + "‘";
                }
            }
        } else {
            String[] keys;
            if (partition.contains("w=")){
                keys = new String[] {"year", "week"};
                partition = partition.replace("w=", "");
            }
            else{
                keys = new String[] {"year","month","day", "hour"};
            }
            String[] items = partition.split("/");
            for(int i=0; i<items.length; i++) {
                if (i == items.length-1) {
                    format += keys[i] + "=‘" + items[i] + "‘";
                } else {
                    format += keys[i] + "=‘" + items[i] + "‘ and ";
                }
            }
        }
        return format;
    }

    private HashMap<String, Object> formatRowMap(Row row){
        HashMap<String, Object> rowMap = new HashMap<String, Object>();
        try {
            for (int i=0; i<row.schema().fields().length; i++) {
                String colName = row.schema().fields()[i].name();
                Object colValue = row.get(i);
                rowMap.put(colName, colValue);

            }
        }catch (Exception ex) {
            ex.printStackTrace();
        }
        return rowMap;
    }

    public static void main(String[] args) {
        String db = args[0];
        String tb = args[1];
        String pt = args[2];
        String cols = args[3];
        SparkDuplicate sparkDuplicate = new SparkDuplicate(db, tb, pt, cols);
        sparkDuplicate.duplicate();
    }
}

4、运行方式

提交任务脚本

#!/bin/bash
source /etc/profile
source ~/.bash_profile
db=$1
table=$2
partition=$3
cols=$4
spark-submit     --queue=root.zhiliangbu_prod_datamonitor     --driver-memory 500M     --executor-memory 13G     --num-executors 50     spark-duplicate-1.0-SNAPSHOT-jar-with-dependencies.jar ${db} ${table} ${partition} ${cols}

运行:

sh run.sh gulfstream_ods g_order 2017/07/11 area,type

结果

重复值为:[email protected]179, 重复个数225                                                  
重复值为:[email protected]80, 重复个数7398
重复值为:[email protected]82, 重复个数69823
重复值为:[email protected]81, 重复个数98317
重复值为:[email protected]84, 重复个数91775
重复值为:[email protected]83, 重复个数72053
重复值为:[email protected]180, 重复个数2362
重复值为:[email protected]86, 重复个数264487
重复值为:[email protected]181, 重复个数2927
重复值为:[email protected]85, 重复个数230484
重复值为:[email protected]88, 重复个数87527
重复值为:[email protected]87, 重复个数74987
重复值为:[email protected]89, 重复个数130297
重复值为:[email protected]188, 重复个数24463
重复值为:[email protected]189, 重复个数15699
重复值为:[email protected]186, 重复个数13517
重复值为:[email protected]187, 重复个数4774
重复值为:[email protected]184, 重复个数5022
重复值为:[email protected]185, 重复个数6737
重复值为:[email protected]182, 重复个数12705
重复值为:[email protected]183, 重复个数18961
重复值为:[email protected]289, 重复个数20715
重复值为:[email protected]168, 重复个数15179
重复值为:[email protected]169, 重复个数1276
重复值为:[email protected]91, 重复个数31664
重复值为:[email protected]90, 重复个数61261
重复值为:[email protected]93, 重复个数32496
重复值为:[email protected]92, 重复个数55877
重复值为:[email protected]95, 重复个数40933
重复值为:[email protected]94, 重复个数32564
重复值为:[email protected]290, 重复个数300
重复值为:[email protected]97, 重复个数21405
重复值为:[email protected]170, 重复个数37696
重复值为:[email protected]291, 重复个数212
重复值为:[email protected]96, 重复个数12442
重复值为:[email protected]99, 重复个数2526
重复值为:[email protected]98, 重复个数17456
重复值为:[email protected]298, 重复个数12688
重复值为:[email protected]177, 重复个数17285
重复值为:[email protected]178, 重复个数11511
重复值为:[email protected]299, 重复个数6622
重复值为:[email protected]175, 重复个数9573
重复值为:[email protected]296, 重复个数2416
重复值为:[email protected]176, 重复个数8109
重复值为:[email protected]297, 重复个数27915
重复值为:[email protected]173, 重复个数58942
重复值为:[email protected]294, 重复个数18842
重复值为:[email protected]295, 重复个数3482
重复值为:[email protected]174, 重复个数31452
重复值为:[email protected]292, 重复个数11436
重复值为:[email protected]171, 重复个数656
重复值为:[email protected]172, 重复个数31557
重复值为:[email protected]293, 重复个数1726
重复值为:[email protected]1, 重复个数288479
重复值为:[email protected]0, 重复个数21067365

 

以上是关于使用spark对hive表中的多列数据判重的主要内容,如果未能解决你的问题,请参考以下文章

Spark 不显示 Hive 表中的数据

Spark 不使用 Hive 分区外部表中的分区信息

如何使用 Spark SQL 识别 hive 表中的分区列

建立Hive和Hbase的映射关系,通过Spark将Hive表中数据导入ClickHouse

使用 Java 将数据存储为 Apache Spark 中的配置单元表

如何对具有多列的表中的数据进行分组[重复]