javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?

Posted

技术标签:

【中文标题】javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?【英文标题】:Is there anyway in javaAPI to a Dataset<Row> to a map() and return a Dataset<Row>?javaAPI 中是否有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>? 【发布时间】:2020-04-22 19:14:28 【问题描述】:

我在 Java 8 中使用 spark-sql-2.4.1v。我有一个如下用例,

Dataset<Row> ds = //a Dataset<Row> read from DB

我需要根据另一个数据集的条目进行一些操作 即

List<String> codesList = Array.asList("code1","code2")
Dataset<Row> codes = sc.createDataSet(codesList , Encoders.bean(String.class))

我需要并行处理所有代码。为了做同样的事情,我正在尝试如下:

 Dataset<Row> ds_res =  codes.map( x_cod ->   //map throwing an error
        calcFunction(sparkSession, filePath, ds ,x_cod );
    ).reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))

 ds_res .write().path(filePath).mode("append").save();

    public static Dataset<Row> calcFunction(sparkSession, filePath, ds ,x_cod )
         //some complex calculation based on x_cod 

        return ds_res ; // return ds_res  for further processing
    

如何使这项工作在集群上并行工作?

【问题讨论】:

您不能在一个数据帧上使用map 并在其中使用另一个数据帧。你需要映射codes吗?你不能改用codesList 吗(它仍然应该并行完成)? 当前您的calcFunction 返回一个数据框,对吗?您正在使用.reduce(new Function2&lt;Dataset&lt;Row&gt; df1,Dataset&lt;Row&gt; df2) =&gt; df1.union(df2)),因此您期望在单个数据帧上使用map 会得到一个数据帧列表。所以看起来你需要一个数据帧,使用map 来获取多个数据帧。这样做的唯一方法是使用不允许的嵌套数据框。也许您可以再次更新以使其更清晰? :) 我周末没有时间检查,但看起来你已经得到了一些答案。 :) 你应该能够做到这一点,得到一个包含所有数据集的数组和 union 的 reduce。在这种情况下,您不需要追加一个空数据集。 (另一种方法是有一个循环并附加到数据集,在这种情况下,您希望从一个空数据集开始并添加到该数据集。) 在 scala 中,你可以这样做:val result = for( toDate &lt;- dates)...res 在 for 循环的最后一行。这将为您提供列表。 【参考方案1】:

将列表编码为数据集是比编码更可行的选择。如果您打算使用 bean 类,您可以将其编码为 Dataset&lt;T&gt;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

public class ParallelizeArray 

    public static void main(String[] args) 
        final SparkSession sparkSess = Constant.getSparkSess();
        List<String> codesList = Arrays.asList("code1", "code2");
        final Dataset<String> dataFrame = sparkSess.createDataset(codesList, Encoders.STRING());
        dataFrame.write().mode(SaveMode.Append).csv("src/main/resources/paraArray");
    


或使用

final Encoder<Dataset> bean = Encoders.bean(Dataset.class);
Dataset<Row> ds_res = codes.map((MapFunction<String, Dataset>) x_cod -> calcFunction(sparkSess, filePath, ds ,x_cod),bean)
                .reduce((ReduceFunction<Dataset>) (df1, df2) -> df1.union(df2));



    public static Dataset<Row> calcFunction(SparkSession sparkSession, String filePath, Dataset<Row> ds ,String x_cod )
        Dataset<Row> ds_res = null;//some complex calculation based on x_cod
        return ds_res ; // return ds_res  for further processing
    

【讨论】:

这是在 Spark 2 中运行 map reduce 作业的新方法。无需一直编写 mapper 和 reducers。所有数据集现在都是 Spark 2.x 中的流 Spark 引擎将处理其余部分。 @BdEngineer 更新了 ans 以解决您的编译错误,但我仍然建议使用最新的 Spark API。 这是 calcFunction 函数的返回类型 不能为泛型创建编码器,例如Dataset 因此 MapFunction 和 ReduceFunction 是使用原始数据集实现的 让我们continue this discussion in chat。

以上是关于javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?的主要内容,如果未能解决你的问题,请参考以下文章

python中是不是有将单词拆分为列表的功能? [复制]

Play 商店 2020:是不是有将应用从封闭测试“推广”到生产的审核流程?

目前是不是有将两个或多个字符串文字类型连接到 TypeScript 中的单个字符串文字类型?

R或Java:是不是有将邮政编码转换为纬度和经度并放在地图上的包?

是否有将范围移动到向量中的标准方法?

是否有将 JSON 嵌入 HTML 的标准?