WrapedArray 的 WrappedArray 到 java 数组

Posted

技术标签:

【中文标题】WrapedArray 的 WrappedArray 到 java 数组【英文标题】:WrappedArray of WrapedArray to java array 【发布时间】:2017-07-26 10:38:34 【问题描述】:

我有一列类型为 set,我使用 spark Dataset API 的 collect_set(),它返回一个包装数组的包装数组。我想要一个来自嵌套包装数组的所有值的单个数组。我该怎么做?

例如。 Cassandra 表:

Col1  
1,2,3
1,5

我正在使用 Spark Dataset API。row.get(0) 返回一个包装数组的包装数组。

【问题讨论】:

【参考方案1】:

假设您有 Dataset<Row> ds,其中有 value 列。

+-----------------------+
|value                  |
+-----------------------+
|[WrappedArray(1, 2, 3)]|
+-----------------------+

它有以下架构

root
 |-- value: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = false)

使用 UDF

如下定义UDF1

static UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>> getValue = new UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>>() 
      public List<Integer> call(WrappedArray<WrappedArray<Integer>> data) throws Exception 
        List<Integer> intList = new ArrayList<Integer>();
        for(int i=0; i<data.size(); i++)
            intList.addAll(JavaConversions.seqAsJavaList(data.apply(i)));
        
        return intList;
    
;

注册并致电UDF1,如下所示

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.callUDF;
import scala.collection.JavaConversions;

//register UDF
spark.udf().register("getValue", getValue, DataTypes.createArrayType(DataTypes.IntegerType));

//Call UDF
Dataset<Row> ds1  = ds.select(col("*"), callUDF("getValue", col("value")).as("udf-value"));
ds1.show();

使用爆炸功能

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;

Dataset<Row> ds2 = ds.select(explode(col("value")).as("explode-value"));
ds2.show(false);

【讨论】:

是的,可以这样做,我尝试了其他方式,我分解了集合,然后使用 collect_set() 聚合它们,所以只有一个数组。您是在告诉我要分解 collect_set() 的结果。在这两种情况下,我都有一个担忧,即是否会对性能产生重大影响?这就是我选择扁平化的原因。你也可以指点我一些关于 spark+java(而不是 scala)+dataset api 的教程、书籍等 我编辑了我的答案以使用 UDF 获取 Array。希望这会有所帮助。【参考方案2】:

如果您有数据框,则可以使用 udf 来展平列表 下面是一个简单的例子

import spark.implicits._

import org.apache.spark.sql.functions._
//create a dummy data

val df = Seq(
  (1, List(1,2,3)),
  (1, List (5,7,9)),
  (2, List(4,5,6)),
  (2,List(7,8,9))
).toDF("id", "list")

val df1 = df.groupBy("id").agg(collect_set($"list").as("col1"))

df1.show(false)

df1 的输出:

+---+----------------------------------------------+
|id |col1                                          |
+---+----------------------------------------------+
|1  |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]|
|2  |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]|
+---+----------------------------------------------+


val testUDF = udf((list: Seq[Seq[Integer]]) => list.flatten)


df1.withColumn("newCol", testUDF($"col1")).show(false)

输出

+---+----------------------------------------------+------------------+
|id |col1                                          |newCol            |
+---+----------------------------------------------+------------------+
|1  |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]|[1, 2, 3, 5, 7, 9]|
|2  |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]|[7, 8, 9, 4, 5, 6]|
+---+----------------------------------------------+------------------+

我希望这会有所帮助!

【讨论】:

您能否发布一个 udf 的 java 等效代码。我在 Seq> 上看到了这个 flatten 函数,但无法正确使用它。 我希望这可以帮助***.com/questions/35348058/… 其实我想要flatten的实现,它不像Java中的list.flatten那么简单,可能是因为Scala更丰富。 flatten 的文档是单行的,对我来说没有意义:( 您可以编写一个 udf 并循环遍历数组,然后创建一个扁平的新数组。

以上是关于WrapedArray 的 WrappedArray 到 java 数组的主要内容,如果未能解决你的问题,请参考以下文章

既是3的倍数又是5的倍数都有哪些

一个三位数既是3的倍数,又是5的倍数。这样的三位数最小是啥

数组的创建,及数组的方法

cnn中的步长的目的和重要性是啥

物质的运动

多态的好处??