详解 Spark 中的 Bucketing

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解 Spark 中的 Bucketing相关的知识,希望对你有一定的参考价值。

参考技术A Bucketing 就是利用 buckets(按列进行分桶)来决定数据分区(partition)的一种优化技术,它可以帮助在计算中避免数据交换(avoid data shuffle)。并行计算的时候shuffle常常会耗费非常多的时间和资源.

Bucketing 的基本原理比较好理解,它会根据你指定的列(可以是一个也可以是多个)计算哈希值,然后具有相同哈希值的数据将会被分到相同的分区。

Bucket的最终目的也是实现分区,但是和Partition的原理不同,当我们根据指定列进行Partition的时候,Spark会根据列的名字对数据进行分区(如果没有指定列名则会根据一个随机信息对数据进行分区)。Bucketing的最大不同在于它使用了指定列的哈希值,这样可以保证具有相同列值的数据被分到相同的分区。

目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一起使用,如下。这个操作其实是将数据保存到了文件中(如果不指定path,也会保存到一个临时目录中)。

数据分桶保存之后,我们才能使用它。

在一个SparkSession内,保存之后你可以通过如下命令通过表名获取其对应的DataFrame.

其中spark是一个SparkSession对象。获取之后就可以使用DataFrame或者在SQL中使用表。

如果你要使用历史保存的数据,那么就不能用上述方法了,也不能像读取常规文件一样使用 spark.read.parquet() ,这种方式读进来的数据是不带bucket信息的。正确的方法是利用CREATE TABLE 语句,详情可用参考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

示例如下:

在我们join两个表的时候,如果两个表最好按照相同的列划分成相同的buckets,就可以完全避免shuffle。根据前面所述的hash值计算方法,两个表具有相同列值的数据会存放在相同的机器上,这样在进行join操作时就不需要再去和其他机器通讯,直接在本地完成计算即可。假设你有左右两个表,各有两个分区,那么join的时候实际计算就是下图的样子,两个机器进行计算,并且计算后分区还是2.

而当需要shuffle的时候,会是这样的,

细心的你可能发现了,上面两个分区对应两个Executor,下面shuffle之后对应的怎么成了三个Executor了?没错,当数据进行shuffle之后,分区数就不再保持和输入的数据相同了,实际上也没有必要保持相同。

我们考虑的是大数据表的连接,本地测试的时候一般使用小的表,所以逆序需要将小表自动广播的配置关掉。如果开启小表广播,那么两个小表的join之后分区数是不会变的,例如:

关闭配置的命令如下:

正常情况下join之后分区数会发生变化:

这个200其实就是 "spark.sql.shuffle.partitions" 配置的值,默认就是200. 所以如果在Join过程中出现了shuffle,join之后的分区一定会变,并且变成spark.sql.shuffle.partitions的值。通常你需要根据自己的集群资源修改这个值,从而优化并行度,但是shuffle是不可避免的。

实际测试结果如下:

Spark依然会利用一些Bucekt的信息,但具体怎么执行目前还不太清楚,还是保持一致的好。

另外,如果你spark job的可用计算核心数小于Bucket值,那么从文件中读取之后Bucekt值会变,就是说bucket的数目不会超过你能使用的最大计算核数。

在处理null值的时候,我们可能会用到一些特殊的函数或者符号,如下表所示。但是在使用bucket的时候这里有个坑,一定要躲过。join的时候千万不要使用 <=> 符号,使用之后spark就会忽略bucket信息,继续shuffle数据,原因可能和hash计算有关。

原文连接

Spark Streaming与Spark SQL结合操作详解

Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类 top3热门的商品。

数据输入模型:

--------------------------
姓名		商品分类		商品名称
alex 	bigdata		hadoop
alex 	bigdata		spark
jack 	bigdata		flink
alex 	bigdata		hadoop
alex 	language	java
pony 	language	python
jone 	language	java
lili 	bigdata		hadoop
--------------------------

第一步:mapToPair

第一步:mapToPair
<bigdata_hadoop,1>
<bigdata_spark,1>
<bigdata_flink,1>
...

第二步:reduceByKeyAndWindow

<bigdata_hadoop,5>
<bigdata_spark,2>
<bigdata_flink,4>
...

第三步:foreachRDD

foreachRDD -> DataFrame -> registerTempView -> sql

Java语言实现:

package com.kfk.spark.common;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext 

    public static JavaStreamingContext getJssc()
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    

package com.kfk.spark.top_hot_product_project;

import com.kfk.spark.common.CommSparkSession;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 1:36 下午
 */
public class TopHotProduct 
    public static void main(String[] args) throws InterruptedException 
        JavaStreamingContext jssc = CommStreamingContext.getJssc();

        /**
         * 输入数据源模型
         * --------------------------
         * 姓名		商品分类		商品名称
         * alex 	bigdata		hadoop
         * alex 	bigdata		spark
         * jack 	bigdata		flink
         * alex 	bigdata		hadoop
         * alex 	language	java
         * pony 	language	python
         * jone 	language	java
         * lili 	bigdata		hadoop
         * --------------------------
         */
        JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);

        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        JavaPairDStream<String, Integer> pairDstream = inputDstream.mapToPair(new PairFunction<String, String, Integer>() 
            @Override
            public Tuple2<String, Integer> call(String lines) throws Exception 

                String[] line = lines.split(" ");
                return new Tuple2<>(line[1] + "_" + line[2], 1);
            
        );

        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        JavaPairDStream<String, Integer> windowPairStream = pairDstream.reduceByKeyAndWindow(
                new Function2<Integer, Integer, Integer>() 
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception 
                return v1+v2;
            
        , Durations.seconds(60),Durations.seconds(10));

        /**
         * foreachRDD
         */
        windowPairStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() 
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception 

                // 转换成RDD[Row]
                JavaRDD<Row> countRowRdd = stringIntegerJavaPairRdd.map(new Function<Tuple2<String, Integer>, Row>() 
                    @Override
                    public Row call(Tuple2<String, Integer> tuple) throws Exception 

                        String category = tuple._1.split("_")[0];
                        String product = tuple._1.split("_")[1];
                        Integer productCount = tuple._2;
                        return RowFactory.create(category,product,productCount);
                    
                );

                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("productCount",DataTypes.IntegerType,true));

                // 构造Schema
                StructType structType = DataTypes.createStructType(structFields);

                // 创建SparkSession
                SparkSession spark = CommSparkSession.getSparkSession();

                // 通过rdd和scheme创建DataFrame
                Dataset<Row> df = spark.createDataFrame(countRowRdd,structType);

                // 创建临时表
                df.createOrReplaceTempView("product_click");

                /**
                 * 统计每个种类下点击次数排名前3名的商品
                 */
                spark.sql("select category,product,productCount from product_click " +
                        "order by productCount desc limit 3").show();

                /**
                 * +--------+-------+------------+
                 * |category|product|productCount|
                 * +--------+-------+------------+
                 * | bigdata| hadoop|           4|
                 * | bigdata|  kafka|           2|
                 * | bigdata|  flink|           2|
                 * +--------+-------+------------+
                 */
            
        );

        jssc.start();
        jssc.awaitTermination();
    

Scala语言实现:

package com.kfk.spark.top_hot_product_project

import com.kfk.spark.common.CommSparkSessionScala, CommStreamingContextScala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
import org.apache.spark.streaming.Seconds

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 3:59 下午
 */
object TopHotProductScala 
    def main(args: Array[String]): Unit = 
        val jssc = CommStreamingContextScala.getJssc

        /**
         * 输入数据源模型
         * --------------------------
         * 姓名		商品分类		商品名称
         * alex 	bigdata		hadoop
         * alex 	bigdata		spark
         * jack 	bigdata		flink
         * alex 	bigdata		hadoop
         * alex 	language	java
         * pony 	language	python
         * jone 	language	java
         * lili 	bigdata		hadoop
         * --------------------------
         */
        val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)

        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        val pairDStream = inputDstream.map(x => (x.split(" ")(1) + "_" + x.split(" ")(2),1))

        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        val windowDStream = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))

        windowDStream.foreachRDD(x => 

            // 转换成RDD[Row]
            val rowRDD = x.map(tuple => 
                val category = tuple._1.split("_")(0)
                val product = tuple._1.split("_")(1)
                val productCount = tuple._2
                Row(category,product,productCount)
            )

            // 构造Schema
            val structType = StructType(Array(
                StructField("category",StringType,true),
                StructField("product",StringType,true),
                StructField("productCount",IntegerType,true)
            ))

            // 创建SparkSession
            val spark = CommSparkSessionScala.getSparkSession()

            // 通过rdd和scheme创建DataFrame
            val df = spark.createDataFrame(rowRDD,structType)

            // 创建临时表
            df.createOrReplaceTempView("product_click")

            /**
             * 统计每个种类下点击次数排名前3名的商品
             */
            spark.sql("select category,product,productCount from product_click "
                    + "order by productCount desc limit 3").show()

            /**
             * +--------+-------+------------+
             * |category|product|productCount|
             * +--------+-------+------------+
             * | bigdata| hadoop|           4|
             * | bigdata|  kafka|           2|
             * | bigdata|  flink|           2|
             * +--------+-------+------------+
             */
        )

        jssc.start()
        jssc.awaitTermination()
    


测试数据源:

alex bigdata hadoop
alex bigdata spark
jack bigdata flink
alex bigdata hadoop
alex language java
pony language python
jone language java
lili bigdata hadoop
lili bigdata hive
lili bigdata hbase
lili bigdata hadoop
lili bigdata spark
lili bigdata flink
lili bigdata kafka
lili bigdata kafka

运行结果:

+--------+-------+------------+
|category|product|productCount|
+--------+-------+------------+
| bigdata| hadoop|           4|
| bigdata|  kafka|           2|
| bigdata|  flink|           2|
+--------+-------+------------+

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~

以上是关于详解 Spark 中的 Bucketing的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming与Spark SQL结合操作详解

详解 Spark 中的 Bucketing

Spark核心功能设计详解

spark RPC详解

Spark Streaming源码解读之Driver中的ReceiverTracker详解

Spark算子执行流程详解之三