合并 Spark DataFrame 中的多列 [Java]
Posted
技术标签:
【中文标题】合并 Spark DataFrame 中的多列 [Java]【英文标题】:Merge multiple columns in a Spark DataFrame [Java] 【发布时间】:2016-06-09 13:02:16 【问题描述】:如何将 DataFrame 中的多列(例如 3 列)组合成单列(在新 DataFrame 中),其中每一行都成为 Spark DenseVector?与thread 类似,但在 Java 中并进行了以下提到的一些调整。
我尝试使用这样的 UDF:
private UDF3<Double, Double, Double, Row> toColumn = new UDF3<Double, Double, Double, Row>()
private static final long serialVersionUID = 1L;
public Row call(Double first, Double second, Double third) throws Exception
Row row = RowFactory.create(Vectors.dense(first, second, third));
return row;
;
然后注册UDF:
sqlContext.udf().register("toColumn", toColumn, dataType);
dataType
在哪里:
StructType dataType = DataTypes.createStructType(new StructField[]
new StructField("bla", new VectorUDT(), false, Metadata.empty()),
);
当我在具有 3 列的 DataFrame 上调用此 UDF 并打印出新 DataFrame 的架构时,我得到了:
root
|-- features: struct (nullable = true)
| |-- bla: vector (nullable = false)
这里的问题是我需要一个向量在外部,而不是在结构内。 像这样的:
root
|-- features: vector (nullable = true)
我不知道如何得到这个,因为 register
函数要求 UDF 的返回类型为 DataType
(反过来,它不提供 VectorType)
【问题讨论】:
【参考方案1】:您实际上是使用此数据类型手动将向量类型嵌套到结构中:
new StructField("bla", new VectorUDT(), false, Metadata.empty()),
如果你去掉外部的 StructField,你会得到你想要的。当然,在这种情况下,您需要稍微修改一下函数定义的签名。即需要返回 Vector 类型。
请参阅下面的具体示例,以简单的 JUnit 测试的形式说明我的意思。
package sample.spark.test;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.junit.Test;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ToVectorTest implements Serializable
private static final long serialVersionUID = 2L;
private UDF3<Double, Double, Double, Vector> toColumn = new UDF3<Double, Double, Double, Vector>()
private static final long serialVersionUID = 1L;
public Vector call(Double first, Double second, Double third) throws Exception
return Vectors.dense(first, second, third);
;
@Test
public void testUDF()
// context
final JavaSparkContext sc = new JavaSparkContext("local", "ToVectorTest");
final SQLContext sqlContext = new SQLContext(sc);
// test input
final DataFrame input = sqlContext.createDataFrame(
sc.parallelize(
Arrays.asList(
RowFactory.create(1.0, 2.0, 3.0),
RowFactory.create(4.0, 5.0, 6.0),
RowFactory.create(7.0, 8.0, 9.0),
RowFactory.create(10.0, 11.0, 12.0)
)),
DataTypes.createStructType(
Arrays.asList(
new StructField("feature1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("feature2", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("feature3", DataTypes.DoubleType, false, Metadata.empty())
)
)
);
input.registerTempTable("input");
// expected output
final Set<Vector> expectedOutput = new HashSet<>(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(4.0, 5.0, 6.0),
Vectors.dense(7.0, 8.0, 9.0),
Vectors.dense(10.0, 11.0, 12.0)
));
// processing
sqlContext.udf().register("toColumn", toColumn, new VectorUDT());
final DataFrame outputDF = sqlContext.sql("SELECT toColumn(feature1, feature2, feature3) AS x FROM input");
final Set<Vector> output = new HashSet<>(outputDF.toJavaRDD().map(r -> r.<Vector>getAs("x")).collect());
// evaluation
assertEquals(expectedOutput.size(), output.size());
for (Vector x : output)
assertTrue(expectedOutput.contains(x));
// show the schema and the content
System.out.println(outputDF.schema());
outputDF.show();
sc.stop();
【讨论】:
这正是我所需要的。不知何故,我设法不考虑从 UDF 返回 Vector 并使用 VectorUDT 注册函数。谢谢罗伯特!以上是关于合并 Spark DataFrame 中的多列 [Java]的主要内容,如果未能解决你的问题,请参考以下文章
Spark dataframe 中某几列合并成vector或拆分
Spark dataframe 中某几列合并成vector或拆分