在 PySpark 中将 ArrayType(StringType()) 的列转换为 ArrayType(DateType())

Posted

技术标签:

【中文标题】在 PySpark 中将 ArrayType(StringType()) 的列转换为 ArrayType(DateType())【英文标题】:Convert Column of ArrayType(StringType()) to ArrayType(DateType()) in PySpark 【发布时间】:2020-12-11 22:45:13 【问题描述】:

我有一个如下数据框,我想将其转换为 ISO-8601:

|     production_date        |       expiration_date         |
--------------------------------------------------------------
|["20/05/1996","01/01/2018"] | ["15/01/1997","27/03/2019"]   |
| ....                         ....                          |
--------------------------------------------------------------

我想要:

|     good_prod_date        |       good_exp_date         |
-------------------------------------------------------------
|[1996-05-20,2018-01-01]    | [1997-01-01,2019-03-27]     |
| ....                         ....                       |
-------------------------------------------------------------

但是,有超过 20 列和数百万行。我试图避免使用 UDF,因为它们效率低下,而且大多数时候是一种糟糕的方法。我也避免爆炸每一列,因为那是:

    效率低下(不必要地创建了数亿行) 不是一个优雅的解决方案 我试过了,还是不行

到目前为止,我有以下内容:

def explodeCols(df):
  return (df
          .withColumn("production_date", sf.explode("production_date"))
          .withColumn("expiration_date", sf.explode("expiration_date")))

def fixTypes(df):
  return (df
          .withColumn("production_date", sf.to_date("production_date", "dd/MM/yyyy"))
          .withColumn("expiration_date", sf.to_date("expiration_date", "dd/MM/yyyy")))

def consolidate(df):
  cols = ["production_date", "expiration_date"]
  return df.groupBy("id").agg(*[sf.collect_list(c) for c in cols])

historyDF = (df
             .transform(explodeCols)
             .transform(fixTypes)
             .transform(consolidate))

但是,当我在 DataBricks 上运行此代码时,作业永远不会执行,事实上,它会导致执行程序失败/死机(这不好)。

我尝试的另一个解决方案如下:

df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))

但我得到的结果是一个空数组:

|     production_date        |       good_prod_date         |
-------------------------------------------------------------
|["20/05/1996","01/01/2018"] | [null,null]                  |
| ....                         ....                         |
-------------------------------------------------------------

【问题讨论】:

火花版?? 我使用的是 Spark 3.0.0 UDFs 在这种情况下效率很高,您是否尝试过比较性能?在不爆炸数组的情况下,将值传递给 udf 并在 udf 中操作日期。 【参考方案1】:

使用pyspark.sql.function.transform 高阶函数而不是explode 函数来转换数组中的每个值。

df
.withColumn("production_date",F.expr("transform(production_date,v -> to_date(v,'dd/MM/yyyy'))"))
.withColumn("expiration_date",F.expr("transform(expiration_date,v -> to_date(v,'dd/MM/yyyy'))"))
.show()
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))

这不起作用,因为production_date 具有不同的日期格式,如果此列具有像yyyy-MM-dd 这样的日期格式,则转换将起作用。

df.select("actual_date").printSchema()
root
 |-- actual_date: array (nullable = true)
 |    |-- element: string (containsNull = true)

df.select("actual_date").show(false)
+------------------------+
|actual_date             |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).printSchema()
root
 |-- actual_date: array (nullable = true)
 |    |-- element: date (containsNull = true)

df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).show()
+------------------------+
|actual_date             |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+

【讨论】:

这很有趣,你能不能给我指出一篇解释pyspark.sql.function.transformspark.sql.function.transform之间区别的文章或博文? 我已经修改了它的实际pyspark.sql.function.transform。在 Scala 中,您必须使用 spark.sql.function.transform 导入 transform 函数 我不相信它是pyspark.sql.function 类,这就是为什么我们只能在F.expr(...) 中使用它。换句话说,我们不能做F.transform()(这是为Scala保留的,对吧?)。

以上是关于在 PySpark 中将 ArrayType(StringType()) 的列转换为 ArrayType(DateType())的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中将字符串列转换为ArrayType

如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType

如何在pyspark中将字符串值转换为arrayType

如何在 Pyspark 中将 ArrayType 的列转换为 Dictionary

Pyspark - 循环通过 structType 和 ArrayType 在 structfield 中进行类型转换

在pyspark中创建带有arraytype列的数据框