如何遍历 spark 数据集并更新 Java 中的列值?

Posted

技术标签:

【中文标题】如何遍历 spark 数据集并更新 Java 中的列值?【英文标题】:How to iterate through a spark dataset and update a column value in Java? 【发布时间】:2018-03-20 22:21:39 【问题描述】:

我正在开发 POC,我必须使用令牌更新数据库中的帐号。我将数据读入数据集 dsRecords(大约 2M 条记录)。我有另一个例程捕获了不同的帐号并获得了令牌,映射存储在 HashMap 中。

Dataset<Row> applySwappedTokens(Dataset<Row> dsRecords, Map<String, String> mappedTokens)

现在,我必须遍历数据集以执行以下操作 - 1. 读取帐号列 (accountNumber) 值并使用 mappedTokens 中的令牌值更新(我知道数据集是不可变的。因此,更新数据集意味着创建具有更新行的数据集副本)它。这可以通过 JOIN 或其他操作来实现,但由于第二个任务,我没有为此付出努力。 2. 读取另一个 XML blob 列并找到帐号并更新它。

到目前为止,我尝试过的所有选项都由于不可序列化的代码而导致编译时错误或测试编译错误。大多数在线资源都使用 Scala 而不是 Java。请帮忙。

火花 2.1 Java 8

方法 1 - 由于序列化错误而无法测试。

Dataset<Row> output = sparkSession.sqlContext().createDataFrame(dsRecords.javaRDD().map(row ->  
            return RowFactory.create(row.get(0), row.get(1), row.get(2), swapToken(row.get(3)),row.get(4));
        ), dsRecords.schema());

        return output;

String swapToken(Object inputToken) 
        return mappedTokens.get(inputToken);//mappedToken will have to be instance field.
    

方法 2 - 不完整。

dsRecords.foreach((ForeachFunction<Row>) row -> 
            Integer index = row.fieldIndex("accountNumber");
            String pan = row.getString(index);
            String swap = this.swapToken(pan);
            //TODO: create a dataset with rows from dsRecords but swap value.

        );

方法 3 - 使用带有地图功能的 UDF

创建一个 UDF2(接受 2 个输入参数,即 accountNumber 和 mappedToken 并返回 token)。看来UDF只能取列值

更新 1 - UDF 所以,我实现了UDF(AFK,稍后会发布代码): 1. 定义UDF1‘updateToken’传递xml列值并返回更新后的xml值。 2. 具有帐户-令牌对映射的 HashMap 实例“mappedTokens”被设为静态。在我的 UDF 函数中访问它以在 xml 字符串中查找帐户并使用令牌进行更新。

我可以测试我的 applySwappedTokens 函数,该函数在数据集“withColumn”上调用上述 UDF。但是,当我运行 spark 程序时,我看到“mappedToken”有“null”数据,因此 xml 列会更新为空数据。我认为静态的“mappedTokens”要么在另一个 JVM 中,要么在驱动程序中(即使在本地,spark 也会创建隔离的驱动程序,执行程序)。令人沮丧的是,没有简单的解决方案来迭代和更新 spark 中的行。

Dataset<Row> processByRow(Dataset<Row> dsRecords, SparkSession sparkSession) 
        sparkSession.udf().register("updateToken", updateToken, DataTypes.StringType);          
        return ds = dsRecords.withColumn("eventRecordTokenText", callUDF("updateToken", dsRecords.col("eventRecordTokenText")));
            

static UDF1 updateToken = new UDF1<String, String>() 
        public String call(final String tokenText) throws Exception 
                // xml operations here..
                for (int nodeIndex = 0; nodeIndex < nList.getLength(); nodeIndex++) 
                    Node thisNode = nList.item(nodeIndex);
                    if (thisNode.getAttributes().getNamedItem("ProcessTokenValue") != null && thisNode.getAttributes()
                            .getNamedItem("ProcessTokenValue").getNodeValue().equalsIgnoreCase("true")) 
                        Node valueNode = thisNode.getAttributes().getNamedItem("Value");
                        String thisToken = valueNode.getNodeValue();
                        String newToken = mappedTokens.get(thisToken); // *returns null values from the map*
                        if(newToken != null && !newToken.isEmpty())
                        valueNode.setNodeValue(newToken);
                    
                
                // more xml operations here..
                return output;
        
    ;

更新 2 - 迭代和更新 现在,我正在尝试逐行遍历..

Dataset<Row> processByRow1(Dataset<Row> dsRecords, SparkSession sparkSession) 
        List<MongoRecordSmall> newRows = new ArrayList<MongoRecordSmall>();
            dsRecords.foreach((ForeachFunction<Row>) record -> 
            String currentToken = record.getAs(AppConstants.TokenCol);
            String newToken = mappedTokens.get(currentToken);
            newRows.add(new MongoRecordSmall(record.getString(0), record.getString(1), newToken, record.getString(3)));
            logger.error(“Size plus=“+newRows.size());
        );
       return sparkSession.createDataFrame(newRows, MongoRecordSmall.class);
    

这是引发序列化错误。似乎 (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html) 我的类存在上述逻辑,正在被序列化并发送到工作节点并且没有这样做。

【问题讨论】:

你能把代码贴在这里我们可以理解你在做什么。你得到了不可序列化的代码错误,因为你可能使用了不可序列化的类对象 HashMap转成dataframe会怎样?哪些 SQL 函数可用于更新 dsRecords? 在将 Map 转换为数据集后,我可以使用 JOIN 更新列。从 Map 迭代和更新没有成功。 【参考方案1】:

由于我没有找到更好的答案,我将用我实施的解决方案来回答我的问题(而且看起来很丑!)-

Dataset<Row> processByRowUpdate(Dataset<Row> dsRecords, SparkSession sparkSession) 
        List<Row> rows = dsRecords.select("accountIdentifier", "accountNumber").collectAsList();
List<MongoRecord> newRows = new ArrayList<MongoRecord>();
        ListIterator<Row> it = rows.listIterator();
        boolean errorOccurred = false;
        while (it.hasNext()) 
            try 
                Row record = it.next();
                MongoRecord mongo = new MongoRecord();
       if (!record.isNullAt(record.fieldIndex("accountIdentifier")))
                    mongo.setAccountIdentifier(String.valueOf(record.getDouble(record.fieldIndex("accountIdentifier"))));

           //... and so on
       newRows.add(mongo);
        catch (Exception exception) 
sparkSession.createDataFrame(newRows, MongoRecord.class);

【讨论】:

以上是关于如何遍历 spark 数据集并更新 Java 中的列值?的主要内容,如果未能解决你的问题,请参考以下文章

在 spark java 中取两个数据集并集的必要条件是啥

循环访问 Access 中的两个记录集并更新条目

C#,遍历数据集并显示数据集列中的每条记录

如何使用 Java 在 Dataset Spark 中过滤列并删除行

比较两个数据集并获取更改了哪些字段

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据