尝试访问 UDF 中的广播变量时​​,Spark 无限期冻结

Posted

技术标签:

【中文标题】尝试访问 UDF 中的广播变量时​​,Spark 无限期冻结【英文标题】:Spark freezes indefinitely when trying to access broadcast variable inside UDF 【发布时间】:2020-02-11 01:50:46 【问题描述】:
 public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb)
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
    log.info(" ##### " + excRateBrdCast.value().count()); //works
    spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>()
        @Override
        public Double call(Date cola, String colb, String colc, Double original)
            Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
            Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
            if(!colc.equals("SOME"))
                Dataset<Row> ds6 = excBrdcastRecv.filter(row -> 
                    boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
                    boolean cond2 = row.getAs("colb").toString().equals(colb);
                    return cond1 && cond2;
                );
                Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
                newRate = newRate*val9;
            
            return newRate;
        
    , DataTypes.DoubleType);

    Dataset<Row> newDs = dfb.withColumn
            ("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));

    return newDs;

几点建议-

    如果我删除对 excRateBrdCast.value() 的访问权并发送回硬编码值,它可以正常工作。 在 java 中使用 spark 2.11 所有数据集都是非常小的本地测试数据集,因此大小不是问题。 在尝试访问广播变量 Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result 时,处理卡住了,没有出现 ant 错误。与调用动作时相同 日志卡在INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0)) INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks 在本地模式下运行

【问题讨论】:

【参考方案1】:

上面的代码中有一个经典错误。所以变量excRateBrdCast 被广播了。然后注册new UDF4。 Spark 实际上会在多个执行器机器上执行该 UDF。在这些机器上,spark 将无法看到excRateBrdCast,因此它将永远停止,等待excRateBrdCast.value() 到达。所以我们需要如何将excRateBrdCast 传递给UDF。它们在代码块中的连续出现是一种欺骗。

所以你需要做的是在另一个类中学习 UDF。并且不要进行内联初始化。在 UDF 中定义一个参数化构造函数,该构造函数接受广播变量 excRateBrdCast 并在初始化期间传递它。

然后就可以看到广播变量了。

【讨论】:

以上是关于尝试访问 UDF 中的广播变量时​​,Spark 无限期冻结的主要内容,如果未能解决你的问题,请参考以下文章

如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?

如何在不指定每一列的情况下将整行作为参数传递给 Spark(Java)中的 UDF?

Pyspark UDF 广播变量未定义仅在由单独脚本导入时

sparksql缓存表能做广播变量吗

Scala Spark 中的 udf 运行时错误

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值