无法使用 Scala 在 Apache Spark 中执行用户定义的函数

Posted

技术标签:

【中文标题】无法使用 Scala 在 Apache Spark 中执行用户定义的函数【英文标题】:Failed to execute user defined function in Apache Spark using Scala 【发布时间】:2017-06-15 14:35:47 【问题描述】:

我有以下数据框:

+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|   time_stamp_0|sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6| len_7|count|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|06:36:16.293711|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65161|  130|
|06:36:16.293729|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65913|  130|
|06:36:16.293743|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|131073|  130|
|06:36:16.293765|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196233|  130|
|06:36:16.293783|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196985|  130|
|06:36:16.293798|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|262145|  130|
|06:36:16.293820|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|327305|  130|
|06:36:16.293837|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|328057|  130|
|06:36:16.293851|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|393217|  130|
|06:36:16.293873|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|458377|  130|
|06:36:16.293890|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|459129|  130|
|06:36:16.293904|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|524289|  130|
|06:36:16.293926|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|589449|  130|
|06:36:16.293942|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|590201|  130|
|06:36:16.293956|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|655361|  130|
|06:36:16.293977|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|720521|  130|
|06:36:16.293994|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|721273|  130|
|06:36:16.294007|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|786433|  130|
|06:36:16.294028|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|851593|  130|
|06:36:16.294045|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|852345|  130|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
only showing top 20 rows

我必须向我的dataframe 添加功能和标签来预测计数值。但是,当我运行代码时,我会看到以下错误:

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

我还 cast(IntegerType) 我的所有功能,但再次发生错误。这是我的代码:

val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

       val toVec9 = udf[Vector, Int, Int, String, String, Int, Int, Int, Int, Int]  (a, b, c, d, e, f, g, h, i) =>
              val e3 = c match 
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              

              val e4 = d match 
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              
              Vectors.dense(a, b, e3, e4, e, f, g, h, i)
            

            val final_df = Dataframe.withColumn(
              "features",
              toVec9(
                // casting into Timestamp to parse the string, and then into Int
                $"time_stamp_0".cast(TimestampType).cast(IntegerType),
                $"count".cast(IntegerType),
                $"sender_ip_1",
                $"receiver_ip_2",
                $"s_port_3".cast(IntegerType),
                $"r_port_4".cast(IntegerType),
                $"acknum_5".cast(IntegerType),
                $"winnum_6".cast(IntegerType),
                $"len_7".cast(IntegerType)
              )
            ).withColumn("label", (Dataframe("count"))).select("features", "label")

final_df.show()

val trainingTest = final_df.randomSplit(Array(0.8, 0.2))
val TrainingDF = trainingTest(0).toDF()
val TestingDF=trainingTest(1).toDF()
TrainingDF.show()
TestingDF.show()

我的依赖也是:

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" %% "spark-core" % "2.1.1",
  "org.apache.spark" %% "spark-sql" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" %% "spark-streaming" % "2.1.1",
  "org.apache.spark" %% "spark-mllib" % "2.1.1"
)

最有趣的一点是,如果我在代码的最后部分将所有cast(IntegerType) 更改为cast(TimestampType).cast(IntegerType),错误就会消失,输出将是这样的:

+--------+-----+
|features|label|
+--------+-----+
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
+--------+-----+

更新:应用@Ramesh Maharjan 解决方案后,我的数据框的结果运行良好,但是,每当我尝试将我的 final_df 数据框拆分为训练和测试时,结果如下所示,我仍然有有空行的同样问题。

+--------------------+-----+
|            features|label|
+--------------------+-----+
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
+--------------------+-----+

你能帮帮我吗?

【问题讨论】:

你能解释一下你想用 udf 做什么吗?因为你没有像上面那样在scala中写udf? @ShankarKoirala:感谢您的回答,我认为您是对的,需要更多解释。这个问题是根据这个问题。 ***.com/questions/44563672/… 你已经得到答案了不是吗? @ShankarKoirala:我需要特征向量来预测“计数”值,并且 vectore 的所有成员都应该是整数,所以我使用 udf,因为我的一些变量是字符串,我需要以某种方式将它们转换为整数。我的代码的原始数据也在我以前的问题中。 @ShankarKoirala:当然不是,这是关于如何提取数据的问题。好的,我将在这里添加我所有的代码。 【参考方案1】:

我没有看到您的问题代码中生成了count column。除了count 专栏@Shankar 的回答应该可以得到你想要的结果。

以下错误是由于 @Shankar 在他的回答中纠正了 udf 函数的错误定义。

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

以下错误是由于versionspark-mllib libraryspark-core libraryspark-sql library 不匹配造成的。它们都应该是相同的版本。

error: Caused by: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen‌​eratedIterator.proce‌​ssNext(Unknown Source) 

我希望解释清楚,希望您的问题尽快得到解决。

已编辑

您还没有按照@Shankar 的建议更改udf 函数。也加.trim,因为我可以看到一些空格

val toVec9 = udf ((a: Int, b: Int, c: String, d: String, e: Int, f: Int, g: Int, h: Int, i: Int) =>
  
  val e3 = c.trim match 
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  
  val e4 = d.trim match 
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  
  Vectors.dense(a, b, e3, e4, e, f, g, h, i)
)

查看您的依赖项,您正在使用%%,它告诉sbt 在您的系统中下载与scala 版本打包的dependencies。那应该没问题,但由于您仍然遇到错误,我想将 dependencies 更改为

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" % "spark-core_2.11" % "2.1.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" % "spark-streaming_2.11" % "2.1.1",
  "org.apache.spark" % "spark-mllib_2.11" % "2.1.1"

)

【讨论】:

感谢您的回答,但是,您提到的任何原因都不是导致该错误的原因。我已经更新了这个问题。我有相同版本的 spark-core 库和 spark-sql 库。我的代码中也存在计数。我只是添加它以查看它的存在。 @Queen 感谢您使用计数列更新问题。我已经更新了我的答案。如果您仍然遇到问题,请告诉我。谢谢 感谢您的回答。它工作正常,我会接受这个答案作为正确答案。但是作为最后一个问题,当我将 final_df 拆分为训练和测试部分时,我遇到了最后一个相同的错误。请检查为什么培训和测试再次为空。我再次更新了问题。 那应该是另一个问题。但我会帮助你。回答我提出的每一个问题。问题中的代码与您的系统中的代码一样吗?顺序? @非常感谢您的好意。是的,顺序与我系统中的代码相同。 :)【参考方案2】:

我认为这就是您创建 udf 的方式

val toVec9 = udf ((a: Int, b: Int, c: String, d: String, e: Int, f: Int, g: Int, h: Int, i: Int) =>

  val e3 = c match 
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  

  val e4 = d match 
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  
  Vectors.dense(a, b, e3, e4, e, f, g, h, i)

)

并将其用作

val final_df = Dataframe.withColumn(
              "features",
              toVec9(
                // casting into Timestamp to parse the string, and then into Int
                $"time_stamp_0".cast(TimestampType).cast(IntegerType),
                $"count".cast(IntegerType),
                $"sender_ip_1",
                $"receiver_ip_2",
                $"s_port_3".cast(IntegerType),
                $"r_port_4".cast(IntegerType),
                $"acknum_5".cast(IntegerType),
                $"winnum_6".cast(IntegerType),
                $"len_7".cast(IntegerType)
              )
            ).withColumn("label", (Dataframe("count"))).select("features", "label")

希望这会有所帮助!

【讨论】:

谢谢,同样的错误: 原因:org.apache.spark.SparkException: 无法执行用户定义的函数(anonfun$15: (int, int, string, string, int, int, int , int, int) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 你对铸件有什么想法吗?

以上是关于无法使用 Scala 在 Apache Spark 中执行用户定义的函数的主要内容,如果未能解决你的问题,请参考以下文章

Scala 中的 Apache Spark 日志记录

无法在 Spark-2.2.0 - Scala-2.11.8 上运行单元测试(scalatest)

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

在 Bash 脚本中执行 Apache Spark (Scala) 代码

无法在 mleap 中序列化 apache spark 变压器

使用 IntelliJ idea 的 Scala 工作表作为 Apache Spark 的 Scala REPL