如何在 Spark SQL 中向现有 Dataframe 添加新列

Posted

技术标签:

【中文标题】如何在 Spark SQL 中向现有 Dataframe 添加新列【英文标题】:How is it possible to add new column to existing Dataframe in Spark SQL 【发布时间】:2015-08-21 08:57:04 【问题描述】:

我使用DataFrame API。

我有现有的 DataFrame 和一个 List 对象(也可以使用 Array)。如何将此列表作为新列添加到现有 DataFrame 中?我应该为此使用 Column 类吗?

【问题讨论】:

【参考方案1】:

您可能应该将您的 List 转换为单个 Column RDD 并由您在 critetia pickeg 上应用 join。简单的 DataFrame 转换:

 val df1 = sparkContext.makeRDD(yourList).toDF("newColumn")

如果您需要创建额外的列来执行连接,您可以添加更多列,映射您的列表:

val df1 = sparkContext.makeRDD(yourList).map(i => (i, fun(i)).toDF("newColumn", "joinOnThisColumn")

我不熟悉Java版本,但你应该尝试使用JavaSparkContext.parallelize(yourList),并基于this doc应用类似的映射操作。

【讨论】:

好的,谢谢,我会试试你的解决方案。但是我在 API Java 中也发现了一些函数,而不是 Scala。非常感谢。【参考方案2】:

对不起,是我的错,我已经找到函数withColumn(String colName, Column col) 应该可以解决我的问题

【讨论】:

withColumn 的唯一问题是很难从列表中按顺序提取元素并将它们添加到选定的行中。如果你有办法做到这一点,这种方式可能会更好,但你的问题是笼统地说;) 为什么,我首先将我的 List 转换为 Column 对象,然后像第二个函数参数一样添加它。不好吗?... 有趣。请在完成后发布你是如何做到的。 @Niemand,主要问题是从 ArrayList 到 Column 对象的强制转换(转换),不会。你之前是不是这个意思? 是的,这或多或少是我在想的。我以为你找到了克服这个问题的方法。【参考方案3】:

这是一个示例,其中我们有一个日期列,并希望添加另一个包含月份的列。

Dataset<Row> newData = data.withColumn("month", month((unix_timestamp(col("date"), "MM/dd/yyyy")).cast("timestamp")));

希望这会有所帮助!

干杯!

【讨论】:

【参考方案4】:

这个线程有点旧,但我在使用 Java 时遇到了类似的情况。我认为最重要的是,对于我应该如何解决这个问题存在概念上的误解。

为了解决我的问题,我创建了一个简单的 POJO 来协助数据集的新列(而不是尝试在现有列上构建)。我认为从概念上讲,我不明白最好在需要添加附加列的初始读取期间生成数据集。我希望这对将来的某人有所帮助。

考虑以下几点:

        JavaRDD<MyPojo> myRdd = dao.getSession().read().jdbc("jdbcurl","mytable",someObject.getProperties()).javaRDD().map( new Function<Row,MyPojo>() 

                       private static final long serialVersionUID = 1L;

                       @Override
                       public MyPojo call(Row row) throws Exception 
                       Integer curDos = calculateStuff(row);   //manipulate my data

                       MyPojo pojoInst = new MyPojo();

                       pojoInst.setBaseValue(row.getAs("BASE_VALUE_COLUMN"));
                       pojoInst.setKey(row.getAs("KEY_COLUMN"));
                       pojoInst.setCalculatedValue(curDos);

                       return pojoInst;
                      
                    );

         Dataset<Row> myRddRFF = dao.getSession().createDataFrame(myRdd, MyPojo.class);

//continue load or other operation here... 

【讨论】:

以上是关于如何在 Spark SQL 中向现有 Dataframe 添加新列的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Ruby 中向 RDoc 添加现有注释? [关闭]

如何在 power bi 中向现有数据添加小计行

如何在 pyspark aws emr 中向现有数据框添加多列?

如何在 sse 中实现有符号定点数学中向零的衰减?

在迁移中向现有表添加新列

如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)