所有列的 Pyspark 数据框数据类型由 UDF 更改为 String
Posted
技术标签:
【中文标题】所有列的 Pyspark 数据框数据类型由 UDF 更改为 String【英文标题】:Pyspark dataframe data type for all columns changed to String by UDF 【发布时间】:2018-01-22 13:19:39 【问题描述】:我有一个数据框,其中有几个列,例如 email_address(String)、paid(integer) 和 date(datetime type)
我正在运行如下所述的 UDF:
from pyspark.sql.functions import udf, col
def conv(column):
date_format='%m/%d/%Y'
a = None
if column:
try:
a= datetime.strptime(str(column),'%Y-%m-%d').strftime(date_format)
print("Inside Try")
except:
a = column
print("Inside except")
return a
conv_func = udf(conv)
df_new = date_df.select(*(conv_func(col(c)).alias(c) for c in date_df.columns))
所以在 df_new 中,我期待 email_address(String)、paid(integer) 和 date(string) 格式从 YYYY 改变-MM-DD 到 MM/DD/YYYY 。
问题是 UDF 确实转换了格式,但也将 paid 列的数据类型从整数 更改为我没有预料到的字符串。
我想知道如何使用这个 UDF 来避免这种情况。
【问题讨论】:
为什么是from pyspark.sql.types import StringType
以及您的 udf
类型在哪里?例如。 udf(conv, StringType())
因为我不希望类型是严格的 StringType 我没有提到 udf 类型。正如您会注意到的那样,数据框也具有整数类型,我不想将该列强制转换为字符串。
我已删除未使用的导入。
不幸的是,udf必须有一个类型,默认情况下,如果你不指定它,它将是“StringType”src
这没有任何意义。为什么要将日期格式应用于电子邮件。只需在您实际要转换的列上使用withColumn
。并且不要使用udf
。 SQL 函数可以轻松完成。
【参考方案1】:
这是您尝试的一种方法。
使用函数来测试列是否为日期,并且仅将转换应用于这些。
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
def conv(column):
date_format='%m/%d/%Y'
a = datetime.strptime(str(column),'%Y-%m-%d').strftime(date_format)
return a
def is_date(column):
try:
conv(column)
return True
except:
return False
conv_func = udf(conv)
date_udf = udf(is_date, BooleanType())
df_new = date_df.select(
*(conv_func(col(c)).alias(c) if date_udf(col(c)) else col(c) for c in date_df.columns)
)
我还没有测试过这个(伪)代码(如果你提供了MCVE 会有所帮助),但我认为这种方法应该可以工作。
【讨论】:
【参考方案2】:要更改日期列的格式,您可以使用 pyspark sql 函数中的 date_format。我创建了示例数据并进行了测试,
>>> for pyspark.sql import functions as F
>>> l=[('2018-01-22','id1',123,'2018-01-21'),('2018-01-22','id2',234,'2018-01-21'),('2018-01-22','id3',345,'2018-01-21'),('2018-01-22','id2',456,'2018-01-21')]
>>> df = spark.createDataFrame(l,['date1','id','value','date2'])
>>> df = df.select(df.date1.cast('date'),'id','value',df.date2.cast('date'))
>>> df.printSchema()
root
|-- date1: date (nullable = true)
|-- id: string (nullable = true)
|-- value: long (nullable = true)
|-- date2: date (nullable = true)
>>> df.show()
+----------+---+-----+----------+
| date1| id|value| date2|
+----------+---+-----+----------+
|2018-01-22|id1| 123|2018-01-21|
|2018-01-22|id2| 234|2018-01-21|
|2018-01-22|id3| 345|2018-01-21|
|2018-01-22|id2| 456|2018-01-21|
+----------+---+-----+----------+
>>> dcols,cols = [],[]
>>> for x in df.schema.fields:
... if repr(x.dataType) == 'DateType':
... dcols.append(x.name)
... else:
... cols.append(x.name)
...
>>> dcols
['date1', 'date2']
>>> cols
['id', 'value']
>>> df.select([F.date_format(c,'MM/dd/yyy').alias('%s'%c) for c in dcols]+cols).show()
+----------+----------+---+-----+
| date1| date2| id|value|
+----------+----------+---+-----+
|01/22/2018|01/21/2018|id1| 123|
|01/22/2018|01/21/2018|id2| 234|
|01/22/2018|01/21/2018|id3| 345|
|01/22/2018|01/21/2018|id2| 456|
+----------+----------+---+-----+
## If you still want to use UDF
>>> from datetime import datetime
>>> def conv(column):
... date_format='%m/%d/%Y'
... a = datetime.strptime(str(column),'%Y-%m-%d').strftime(date_format)
... return a
...
>>> conv_func = F.udf(conv)
>>> df.select([conv_func(F.col(x)).alias('%s'%x) for x in dcols]+cols).show()
+----------+----------+---+-----+
| date1| date2| id|value|
+----------+----------+---+-----+
|01/22/2018|01/21/2018|id1| 123|
|01/22/2018|01/21/2018|id2| 234|
|01/22/2018|01/21/2018|id3| 345|
|01/22/2018|01/21/2018|id2| 456|
+----------+----------+---+-----+
希望这会有所帮助。
【讨论】:
以上是关于所有列的 Pyspark 数据框数据类型由 UDF 更改为 String的主要内容,如果未能解决你的问题,请参考以下文章