如何在从数据集中选择列的不同值时保持数据的顺序
Posted
技术标签:
【中文标题】如何在从数据集中选择列的不同值时保持数据的顺序【英文标题】:How to maintain the order of the data while selecting the distinct values of column from Dataset 【发布时间】:2019-03-12 07:57:24 【问题描述】:我有一个如下的数据集,
+------+------+---------------+
| col1 | col2 | sum(costs) |
+------+------+---------------+
| 1 | a | 3555204326.27 |
| 4 | b | 22273491.72 |
| 5 | c | 219175.00 |
| 3 | a | 219175.00 |
| 2 | c | 75341433.37 |
+------+------+---------------+
我需要选择 col1 的不同值,我的结果数据集的顺序应为 1、4、5、3、2(这些值在初始数据集中可用的顺序)。但订单正在被洗牌。有什么方法可以保持与初始数据集相同的顺序。 Spark/SQL 中的任何建议都可以。
这个数据集可以通过spark中的以下序列获得。
df = sqlCtx.createDataFrame(
[(1, a, 355.27), (4, b, 222.98), (5, c, 275.00), (3, a, 25.00),
(2, c, 753.37)], ('Col1', 'col2', 'cost'));
【问题讨论】:
您能否在问题中添加数据来自何处(CSV、数据库)、源是否不可变以及您使用的是哪个版本的 Spark... 【参考方案1】:您可以添加包含每行索引的另一列,然后在“distinct”之后对该列进行排序。这是一个例子:
import org.apache.spark.sql.functions._
val df = Seq(1, 4, 4, 5, 2)
.toDF("a")
.withColumn("id", monotonically_increasing_id())
df.show()
// +---+---+
// | a| id|
// +---+---+
// | 1| 0|
// | 4| 1|
// | 4| 2|
// | 5| 3|
// | 2| 4|
// +---+---+
df.dropDuplicates("a").sort("id").show()
// +---+---+
// | a| id|
// +---+---+
// | 1| 0|
// | 4| 1|
// | 5| 3|
// | 2| 4|
// +---+---+
请注意,要对 1 个特定列进行区分,您可以使用 dropDuplicates
,如果您想控制在重复的情况下要采用哪一行,请使用 groupBy
。
【讨论】:
【参考方案2】:假设您正在尝试远程处理 col2
中的重复项(因为 col1
中没有),那么最终结果将是:
+----+----+---------------+
|col1|col2| sum|
+----+----+---------------+
| 1| a|3.55520432627E9|
| 4| b| 2.227349172E7|
| 5| c| 219175.0|
+----+----+---------------+
您可以添加如下索引列:
df = df.withColumn("__idx", monotonically_increasing_id());
然后做所有你想要的转换,然后放下它,比如:
df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");
这意味着做:
第 1 步:加载数据和内容:
+----+----+---------------+
|col1|col2| sum|
+----+----+---------------+
| 1| a|3.55520432627E9|
| 4| b| 2.227349172E7|
| 5| c| 219175.0|
| 3| a| 219175.0|
| 2| c| 7.534143337E7|
+----+----+---------------+
第二步:添加索引:
+----+----+---------------+-----+
|col1|col2| sum|__idx|
+----+----+---------------+-----+
| 1| a|3.55520432627E9| 0|
| 4| b| 2.227349172E7| 1|
| 5| c| 219175.0| 2|
| 3| a| 219175.0| 3|
| 2| c| 7.534143337E7| 4|
+----+----+---------------+-----+
第 3 步:转换(此处删除 col2
中的 dups)并删除 __idx
列:
+----+----+---------------+
|col1|col2| sum|
+----+----+---------------+
| 1| a|3.55520432627E9|
| 4| b| 2.227349172E7|
| 5| c| 219175.0|
+----+----+---------------+
Java 代码可以是:
package net.jgp.books.spark.ch12.lab990_others;
import static org.apache.spark.sql.functions.monotonically_increasing_id;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* Keeping the order of rows during transformations.
*
* @author jgp
*/
public class KeepingOrderApp
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args)
KeepingOrderApp app = new KeepingOrderApp();
app.start();
/**
* The processing code.
*/
private void start()
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Splitting a dataframe to collect it")
.master("local")
.getOrCreate();
Dataset<Row> df = createDataframe(spark);
df.show();
df = df.withColumn("__idx", monotonically_increasing_id());
df.show();
df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");
df.show();
private static Dataset<Row> createDataframe(SparkSession spark)
StructType schema = DataTypes.createStructType(new StructField[]
DataTypes.createStructField(
"col1",
DataTypes.IntegerType,
false),
DataTypes.createStructField(
"col2",
DataTypes.StringType,
false),
DataTypes.createStructField(
"sum",
DataTypes.DoubleType,
false) );
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create(1, "a", 3555204326.27));
rows.add(RowFactory.create(4, "b", 22273491.72));
rows.add(RowFactory.create(5, "c", 219175.0));
rows.add(RowFactory.create(3, "a", 219175.0));
rows.add(RowFactory.create(2, "c", 75341433.37));
return spark.createDataFrame(rows, schema);
【讨论】:
【参考方案3】:你可以在你的数据库中添加一个索引列,然后在你的 SQL 请求中创建一个 ORDER BY id
【讨论】:
【参考方案4】:我相信您需要重新格式化您的查询并使用 group by 而不是 distinct 像这个答案建议 SQL: How to keep rows order with DISTINCT?
【讨论】:
以上是关于如何在从数据集中选择列的不同值时保持数据的顺序的主要内容,如果未能解决你的问题,请参考以下文章
比较两个(py)spark sql数据框并在保持连接列的同时有条件地选择列数据