使用 Java 在 Spark 2.0 中使用数据集的 GroupByKey
Posted
技术标签:
【中文标题】使用 Java 在 Spark 2.0 中使用数据集的 GroupByKey【英文标题】:GroupByKey with datasets in Spark 2.0 using Java 【发布时间】:2017-01-16 09:14:19 【问题描述】:我有一个包含如下数据的数据集:
|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |
...
现在,我想得到如下分组的数据 (col1: String Key, col2: List):
| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...
我认为使用 goupByKey 将是一个足够的解决方案,但我找不到任何示例,如何使用它。
谁能帮我找到解决方案,使用 groupByKey 或使用任何其他转换和操作的组合通过使用数据集而不是 RDD 来获得此输出?
【问题讨论】:
【参考方案1】:这是带有数据集的 Spark 2.0 和 Java 示例。
public class SparkSample
public static void main(String[] args)
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.master("local")
.getOrCreate();
//input data
List<Tuple2<Integer,String>> inputList = new ArrayList<Tuple2<Integer,String>>();
inputList.add(new Tuple2<Integer,String>(1, "a"));
inputList.add(new Tuple2<Integer,String>(1, "b"));
inputList.add(new Tuple2<Integer,String>(1, "c"));
inputList.add(new Tuple2<Integer,String>(2, "a"));
inputList.add(new Tuple2<Integer,String>(2, "b"));
//dataset
Dataset<Row> dataSet = spark.createDataset(inputList, Encoders.tuple(Encoders.INT(), Encoders.STRING())).toDF("c1","c2");
dataSet.show();
//groupBy and aggregate
Dataset<Row> dataSet1 = dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2")).toDF("c1","c2");
dataSet1.show();
//stop
spark.stop();
【讨论】:
很高兴我能帮上忙。【参考方案2】:在 Spark 2.0 中使用 DataFrame:
scala> val data = List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b")).toDF("c1", "c2")
data: org.apache.spark.sql.DataFrame = [c1: int, c2: string]
scala> data.groupBy("c1").agg(collect_list("c2")).collect.foreach(println)
[1,WrappedArray(a, b, c)]
[2,WrappedArray(a, b)]
【讨论】:
【参考方案3】:这会将表格读入数据集变量
Dataset<Row> datasetNew = dataset.groupBy("c1").agg(functions.collect_list("c2"));
datasetNew.show()
【讨论】:
以上是关于使用 Java 在 Spark 2.0 中使用数据集的 GroupByKey的主要内容,如果未能解决你的问题,请参考以下文章
Spark 2.0 groupBy 列,然后在 datetype 列上获取 max(date)
Apache Spark 2.0三种API的传说:RDDDataFrame和Dataset
SPARK 2.0:火花信息理论特征选择 java.lang.NoSuchMethodError:微风.linalg.DenseMatrix
如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?