javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?
Posted
技术标签:
【中文标题】javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?【英文标题】:Is there anyway in javaAPI to a Dataset<Row> to a map() and return a Dataset<Row>?javaAPI 中是否有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>? 【发布时间】:2020-04-22 19:14:28 【问题描述】:我在 Java 8 中使用 spark-sql-2.4.1v。我有一个如下用例,
Dataset<Row> ds = //a Dataset<Row> read from DB
我需要根据另一个数据集的条目进行一些操作 即
List<String> codesList = Array.asList("code1","code2")
Dataset<Row> codes = sc.createDataSet(codesList , Encoders.bean(String.class))
我需要并行处理所有代码。为了做同样的事情,我正在尝试如下:
Dataset<Row> ds_res = codes.map( x_cod -> //map throwing an error
calcFunction(sparkSession, filePath, ds ,x_cod );
).reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))
ds_res .write().path(filePath).mode("append").save();
public static Dataset<Row> calcFunction(sparkSession, filePath, ds ,x_cod )
//some complex calculation based on x_cod
return ds_res ; // return ds_res for further processing
如何使这项工作在集群上并行工作?
【问题讨论】:
您不能在一个数据帧上使用map
并在其中使用另一个数据帧。你需要映射codes
吗?你不能改用codesList
吗(它仍然应该并行完成)?
当前您的calcFunction
返回一个数据框,对吗?您正在使用.reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))
,因此您期望在单个数据帧上使用map
会得到一个数据帧列表。所以看起来你需要一个数据帧,使用map
来获取多个数据帧。这样做的唯一方法是使用不允许的嵌套数据框。也许您可以再次更新以使其更清晰? :)
我周末没有时间检查,但看起来你已经得到了一些答案。 :)
你应该能够做到这一点,得到一个包含所有数据集的数组和 union
的 reduce。在这种情况下,您不需要追加一个空数据集。 (另一种方法是有一个循环并附加到数据集,在这种情况下,您希望从一个空数据集开始并添加到该数据集。)
在 scala 中,你可以这样做:val result = for( toDate <- dates)...
和 res
在 for 循环的最后一行。这将为您提供列表。
【参考方案1】:
将列表编码为数据集是比编码更可行的选择。如果您打算使用 bean 类,您可以将其编码为 Dataset<T>
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
public class ParallelizeArray
public static void main(String[] args)
final SparkSession sparkSess = Constant.getSparkSess();
List<String> codesList = Arrays.asList("code1", "code2");
final Dataset<String> dataFrame = sparkSess.createDataset(codesList, Encoders.STRING());
dataFrame.write().mode(SaveMode.Append).csv("src/main/resources/paraArray");
或使用
final Encoder<Dataset> bean = Encoders.bean(Dataset.class);
Dataset<Row> ds_res = codes.map((MapFunction<String, Dataset>) x_cod -> calcFunction(sparkSess, filePath, ds ,x_cod),bean)
.reduce((ReduceFunction<Dataset>) (df1, df2) -> df1.union(df2));
public static Dataset<Row> calcFunction(SparkSession sparkSession, String filePath, Dataset<Row> ds ,String x_cod )
Dataset<Row> ds_res = null;//some complex calculation based on x_cod
return ds_res ; // return ds_res for further processing
【讨论】:
这是在 Spark 2 中运行 map reduce 作业的新方法。无需一直编写 mapper 和 reducers。所有数据集现在都是 Spark 2.x 中的流 Spark 引擎将处理其余部分。 @BdEngineer 更新了 ans 以解决您的编译错误,但我仍然建议使用最新的 Spark API。 这是 calcFunction 函数的返回类型 不能为泛型创建编码器,例如Dataset以上是关于javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?的主要内容,如果未能解决你的问题,请参考以下文章
Play 商店 2020:是不是有将应用从封闭测试“推广”到生产的审核流程?
目前是不是有将两个或多个字符串文字类型连接到 TypeScript 中的单个字符串文字类型?