使用 .NET for Spark 在数据帧中高效地填充数据

Posted

技术标签:

【中文标题】使用 .NET for Spark 在数据帧中高效地填充数据【英文标题】:Time Efficient gap filling data in dataframe using .NET for Spark 【发布时间】:2021-03-26 14:45:41 【问题描述】:

我想使用 .NET for Spark 来填补我的 DataFrame 中的空白。

当前DataFrame (rawData) 包含reportFromreportTo 之间一分钟间隔的数据

DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);

缺少一些间隔,我想用最后一个已知值填充它们。

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
...
|2021|    3|  4|  22|     7|                87|               Power|               0.0|

第一步(插入缺失的分钟)后我期望的结果是:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     6|              null|                null|              null|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
|2021|    3|  4|  14|     8|              null|                null|              null|
...
|2021|    3|  4|  23|    59|              null|                null|              null|               

到目前为止,我使用所有分钟创建一个新的DataFrame,然后在两个数据帧上执行left outer Join

int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)

    timeList.Add(reportFrom);
    reportFrom = reportFrom.AddMinutes(inc);
    

var toFillTime0 = new List<object>  -1, 0, 0, 0, 0 ;

var dataToFill = spark.CreateDataFrame(
    new List<GenericRow>  new GenericRow(toFillTime0.ToArray()) ,
    new StructType(                     //shema
    new List<StructField>()
    
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
    ));

foreach (DateTime time in timeList)


    var toFillTime = new List<object>  time.Year, time.Month, time.Day, time.Hour, time.Minute ;

    var dataToFillt = spark.CreateDataFrame(
        new List<GenericRow>  new GenericRow(toFillTime.ToArray()) ,
        new StructType(                     //shema
        new List<StructField>()
        
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
        ));

    dataToFill = dataToFill.Union(dataToFillt);



dataToFill = dataToFill.Filter("Year0 > 0");    

var toFillReportDataReq = dataToFill.Join(rawData,
                dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
                & dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");    

toFillReportDataReq 的几行如下所示:

|2021|    3|  4|  22|     4|                87|               Power|               0.0|
|2021|    3|  4|  22|     5|                87|               Power|               0.0|
|2021|    3|  4|  22|     6|                87|               Power|               0.0|
|2021|    3|  4|  22|     7|                87|               Power|               0.0|
|2021|    3|  4|  22|     8|              null|                null|              null|
|2021|    3|  4|  22|     9|              null|                null|              null|
|2021|    3|  4|  22|    10|              null|                null|              null|
|2021|    3|  4|  22|    11|              null|                null|              null|
|2021|    3|  4|  22|    12|              null|                null|              null|
|2021|    3|  4|  22|    13|              null|                null|              null|
|2021|    3|  4|  22|    14|              null|                null|              null|

Values 列中空值的替换已经使用windowlast 函数进行了介绍。

IdType 列中的值替换为 var id = 87 和“电源”使用

toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
   .Otherwise(toFillReportDataReq["Id"]))
   .WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
    .Otherwise(toFillReportDataReq["Type"]));

这个方法返回我想要的结果,但是非常耗时(效率低下)。

我的问题如下:

是否有更充分的方法来创建一个新的DataFrame,其中包含指定间隔之间的所有分钟数? 有什么办法可以避免Join这个方法? 将 Id 列中的所有值定义为 id 并将 Type 列中的所有值定义为“Power”的最佳方法是什么?

谢谢!

【问题讨论】:

我现在不在电脑附近,所以明天尝试做一个例子。如果是我,我会找出我需要投影多少分钟(以分钟为单位的日期差异),然后执行 spark.Range(numberOfMinutes)。将其左外连接到您的数据框和 WithColumn id 从范围和日期添加分钟开始时间到该 id - 如果您可以避免在循环中调用 spark 那么它会更有效。 感谢您的建议!我会按照建议尝试。如果您能发布您的解决方案,我将不胜感激。 【参考方案1】:

这是我将采取的方法:

    构建一个 DataFrame,每分钟显示一行(我使用 spark.Range 为我需要的每一分钟投影一行) 对于 Range 中的每个 ID,将开始日期添加一分钟 使用 left_outer 连接将日期加入原始数据框,这样您就不会丢失任何行 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

namespace ***

    class Program
    
        static void Main(string[] args)
        
            var spark = SparkSession.Builder().GetOrCreate();
            
            //Sample data set - we will fill in the missing minutes
            var df = spark.CreateDataFrame(new List<GenericRow>()
            
                new GenericRow(new object[] 2021, 3, 4, 8, 3, 87, "Type1", 380.5),
                new GenericRow(new object[] 2021, 3, 4, 8, 10, null, null, null),
                new GenericRow(new object[] 2021, 3, 4, 8, 20, null, null, null),
                new GenericRow(new object[] 2021, 3, 4, 8, 25, null, null, null),
                new GenericRow(new object[] 2021, 3, 4, 8, 35, 87, "Type1", 0.0),
                new GenericRow(new object[] 2021, 3, 4, 8, 45, 87, "Type1", 0.0)
            , new StructType(new List<StructField>()
            
                new StructField("Year", new IntegerType()),
                new StructField("Month", new IntegerType()),
                new StructField("Day", new IntegerType()),
                new StructField("Hour", new IntegerType()),
                new StructField("Minute", new IntegerType()),
                new StructField("ID", new IntegerType()),
                new StructField("Type", new StringType()),
                new StructField("Value", new DoubleType()),
            ));
            
            //start and end time
            var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
            var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
            
            //convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
            var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
            
            //how many total rows do we need?
            var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
            
            //create a dataframe with 1 row for every minute we need
            var everyMinute = spark.Range((long) minutesToCreate);
            
            //Add the reportFrom unix epoch
            everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
            
            //add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
            everyMinute = everyMinute.WithColumn("Time",
                Functions.Lit(unixFromTime)
                    .Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
            
            //convert the unix epoch to an actual timestamp and drop all the intermediate columns
            everyMinute = everyMinute.WithColumn("Date",
                Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
                
            //convert timestamp into individual columns

            everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));

            //join both data frames so...
            var dfAllData = everyMinute.Join(df, new List<string>() "Year", "Month", "Day", "Hour", "Minute", "left_outer");
            
            //add in data using Last
            var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
            var filledDataFrame = dfAllData.WithColumn("newValue",
                Functions.When(dfAllData["Value"].IsNull(),
                        Functions.Last(dfAllData["Value"], true).Over(window))
                    .Otherwise(dfAllData["Value"]));

            filledDataFrame.Show(1000, 10000);
        
    

编辑

【讨论】:

谢谢,您的解决方案完美运行。我只有一个关于前向填充的问题。假设DataFrame 中有更多的Ids(大约100 个不同的Ids)和更多的Types(power1、power2、power3,...)。如何填写每个Id 和每个Type 的每日分钟数?到目前为止,我过滤它们以获得一个 IdType,但随后我们得到了一种我们希望避免的 for 循环。有没有办法为多个 Ids 和 Types 添加分钟数? 嗨@V.J。我将创建第三个数据框,其中包含所有可能的 ID 和类型 - 然后对 everyMinute 数据框进行交叉连接,然后在本例中执行 left_outer 感谢您的建议。会试试的。

以上是关于使用 .NET for Spark 在数据帧中高效地填充数据的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中使用 LSH 对数据帧中的每个点运行最近邻查询

删除Spark数据帧中具有句点的列名称

如何确定 Apache Spark 数据帧中的分区大小

Spark:如何重用在数据帧中定义了所有字段的相同数组模式

控制 spark-sql 和数据帧中的字段可空性

Spark 2.0 将 json 读入带有引号的数据帧中 - 与 spark 1.6 不同的行为......错误?