使用 Java 在 Apache Spark 中从数据集中复制一行 n 次
Posted
技术标签:
【中文标题】使用 Java 在 Apache Spark 中从数据集中复制一行 n 次【英文标题】:Replicating a row from a Dataset n times in Apache Spark using Java 【发布时间】:2017-09-29 00:35:14 【问题描述】:我正在尝试从数据集中复制一行 n 次并从中创建一个新数据集。但是,在复制时,我需要为每次复制更改列的值,因为最终存储时它将成为主键。
以下是来自 SO 帖子的 Scala 代码:Replicate Spark Row N-times
import org.apache.spark.sql.functions._
val result = singleRowDF
.withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
.selectExpr(singleRowDF.columns: _*)
如何从 Java 中的值数组创建一列并将其传递给爆炸函数?建议很有帮助。
谢谢
【问题讨论】:
【参考方案1】:这是从数据集中复制一行 n 次的 Java 程序。
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.lit;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSample
public static void main(String[] args)
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.master("local[*]")
.getOrCreate();
//Create Dataset
List<Tuple2<String,Double>> inputList = new ArrayList<Tuple2<String,Double>>();
inputList.add(new Tuple2<String,Double>("A",1.0));
Dataset<Row> df = spark.createDataset(inputList, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF();
df.show(false);
//Java 8 style of creating Array. You can create by using for loop as well
int[] array = IntStream.range(0, 5).toArray();
//With Dummy Column
Dataset<Row> df1 = df.withColumn("dummy", explode(lit(array)));
df1.show(false);
//Drop Dummy Column
Dataset<Row> df2 = df1.drop(col("dummy"));
df2.show(false);
下面是这个程序的输出。
+---+---+
|_1 |_2 |
+---+---+
|A |1.0|
+---+---+
+---+---+-----+
|_1 |_2 |dummy|
+---+---+-----+
|A |1.0|0 |
|A |1.0|1 |
|A |1.0|2 |
|A |1.0|3 |
|A |1.0|4 |
+---+---+-----+
+---+---+
|_1 |_2 |
+---+---+
|A |1.0|
|A |1.0|
|A |1.0|
|A |1.0|
|A |1.0|
+---+---+
【讨论】:
感谢示例代码!我将检查它如何针对大量行(数百万行)进行扩展。以上是关于使用 Java 在 Apache Spark 中从数据集中复制一行 n 次的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache Spark ML API 中从“DataFrame”创建一个“Vector”?
如何在 Spark 中从 cassandra datastax 云中读取数据
在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错