在 for 循环中构建 Spark sql 数据集
Posted
技术标签:
【中文标题】在 for 循环中构建 Spark sql 数据集【英文标题】:constructing Spark sql Datasets inside for loop 【发布时间】:2017-10-31 06:09:10 【问题描述】:TestDF 是一个数据框。可以在 for 循环内编辑/更改 10 次吗?
Spark 没有选项可以编辑并保存在同一数据集中。
java 也没有动态变量赋值。
需要在 for 循环中执行类似“Dataset <Row>
testDF+(i+1) = testDF+(i)”(动态变量)或“Dataset <Row>
testDF = testDF”(在同一数据集中)的操作。
有什么方法可以在 for 循环中循环 spark DF 吗?
String[] arraytest = schemaString.split(";");
for (int i=0;i < arraytest .length;i++)
String fieldName = arraytest[i];
Dataset<Row> testDF+(i+1) = testDF+(i)
.withColumn(fieldName,
functions.when(functions.col(fieldName).equalTo(""),"-99")
.otherwise(functions.col(fieldName)));
【问题讨论】:
【参考方案1】:为此使用累加器
例如,您可以使用类似的东西
CollectionAccumulator<String> queryAccumulator = sparkSession.sparkContext().collectionAccumulator();
Dataset<Row> table= sparkSession.sql("select * from table");
table.foreachPartition(partition->
while(partition.hasNext())
Row row = partition.next();
String sql="select * from table where cond1= '"+row.getAs("cond1")+"' and cond2= '"+row.getAs("cond2")+"' order " +
"by starttime desc limit 24";
queryAccumulator.add(sql);
);
logger.info("Queries to be executed are ",queryAccumulator.value());
Dataset<Row> limitedDataDf= queryAccumulator.value().stream().map(query-> sparkSession.sql(query)).reduce(Dataset::union).get();
limitedDataDf.createOrReplaceTempView("SPARKJOBINFORMATION_LIMIT");
【讨论】:
以上是关于在 for 循环中构建 Spark sql 数据集的主要内容,如果未能解决你的问题,请参考以下文章