在 for 循环中构建 Spark sql 数据集

Posted

技术标签:

【中文标题】在 for 循环中构建 Spark sql 数据集【英文标题】:constructing Spark sql Datasets inside for loop 【发布时间】:2017-10-31 06:09:10 【问题描述】:

TestDF 是一个数据框。可以在 for 循环内编辑/更改 10 次吗?

Spark 没有选项可以编辑并保存在同一数据集中。

java 也没有动态变量赋值。

需要在 for 循环中执行类似“Dataset <Row>testDF+(i+1) = testDF+(i)”(动态变量)或“Dataset <Row>testDF = testDF”(在同一数据集中)的操作。

有什么方法可以在 for 循环中循环 spark DF 吗?

String[] arraytest = schemaString.split(";");

for (int i=0;i < arraytest .length;i++) 
    String fieldName = arraytest[i];

    Dataset<Row> testDF+(i+1) = testDF+(i)
       .withColumn(fieldName, 
           functions.when(functions.col(fieldName).equalTo(""),"-99")
           .otherwise(functions.col(fieldName)));
    

【问题讨论】:

【参考方案1】:

为此使用累加器

例如,您可以使用类似的东西

CollectionAccumulator<String> queryAccumulator = sparkSession.sparkContext().collectionAccumulator();


Dataset<Row> table= sparkSession.sql("select * from table");

    table.foreachPartition(partition->
        while(partition.hasNext())
            Row row = partition.next();
        String sql="select * from table where cond1= '"+row.getAs("cond1")+"' and cond2= '"+row.getAs("cond2")+"' order " +
                "by starttime desc limit 24";
            queryAccumulator.add(sql);

    );
    logger.info("Queries to be executed are ",queryAccumulator.value());
    Dataset<Row> limitedDataDf= queryAccumulator.value().stream().map(query-> sparkSession.sql(query)).reduce(Dataset::union).get();

    limitedDataDf.createOrReplaceTempView("SPARKJOBINFORMATION_LIMIT");

【讨论】:

以上是关于在 for 循环中构建 Spark sql 数据集的主要内容,如果未能解决你的问题,请参考以下文章

迭代多个 CSV 并加入 Spark SQL

Spark2 Java 数据集 Sql

sql 游标如何循环

spark sql map():_*函数

通过在 Spark 中列出该位置下的文件来避免“for 循环”

如何在for和if循环中获取spark scala数据帧的最后一行的第一列值