PySpark:如何用逗号指定列作为十进制
Posted
技术标签:
【中文标题】PySpark:如何用逗号指定列作为十进制【英文标题】:PySpark: How to specify column with comma as decimal 【发布时间】:2018-10-08 12:43:48 【问题描述】:我正在使用 PySpark 并加载一个 csv
文件。我有一列包含欧洲格式的数字,这意味着逗号替换了点,反之亦然。
例如:我有2.416,67
而不是2,416.67
。
My data in .csv file looks like this -
ID; Revenue
21; 2.645,45
23; 31.147,05
.
.
55; 1.009,11
在 pandas 中,通过在 pd.read_csv()
中指定 decimal=','
和 thousands='.'
选项以读取欧洲格式,可以轻松读取此类文件。
熊猫代码:
import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')
我不知道如何在 PySpark 中做到这一点。
PySpark 代码:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Revenue", FloatType(), True)
])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)
谁能建议我们如何使用上面提到的.csv()
函数在 PySpark 中加载这样的文件?
【问题讨论】:
为什么要指定分号分隔符?您提供的示例文件看起来是以空格分隔的,可能带有制表符? 这只是一个例子。好的,让我改一下。 【参考方案1】:由于数据的格式,您将无法将其读取为浮点数。您需要将其读取为字符串,将其清理干净,然后转换为浮点数:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = df.withColumn('revenue', regexp_replace('revenue', '\\.', ''))
df = df.withColumn('revenue', regexp_replace('revenue', ',', '.'))
df = df.withColumn('revenue', df['revenue'].cast("float"))
您也可以将这些链接在一起:
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = (
df
.withColumn('revenue', regexp_replace('revenue', '\\.', ''))
.withColumn('revenue', regexp_replace('revenue', ',', '.'))
.withColumn('revenue', df['revenue'].cast("float"))
)
请注意,我没有对此进行测试,因此可能有一两个错字。
【讨论】:
您好,非常感谢您的回复。好吧,我也在考虑它,但我认为可能有一个功能可以做到这一点。我会搜索它,如果我没有找到它,我会添加它作为答案。 AFAIK 没有功能可以仅使用.csv()
方法来执行此操作。为什么你有这个要求?
您好,我没有这个要求。只是想使用一个内置函数(如果有的话)。但是,是的,带有替换的正则表达式就足够了:)
这是否解决了您的问题?如果可以,问题能否结束?
嗨 jhole89,只需替换 '.'和 '\\。'在第一个正则表达式中,因为 DOT 匹配正则表达式中的所有内容并将 FloatType 更改为“float”以使此代码正常工作。除此之外,代码运行良好。【参考方案2】:
如果您的数据集有很多浮点列,但数据集的大小仍然足够小,可以先使用 pandas 对其进行预处理,我发现执行以下操作会更容易。
import pandas as pd
df_pandas = pd.read_csv('yourfile.csv', sep=';', decimal=',')
df_pandas.to_csv('yourfile__dot_as_decimal_separator.csv', sep=';', decimal='.') # optionally also header=True of course.
df_spark = spark.csv.read('yourfile__dot_as_decimal_separator.csv', sep=';', inferSchema=True) # optionally also header=True of course.
我确实发现 jhole89 的答案非常有用,但发现将它应用于具有很多列(数百列)的数据集很痛苦。
我的意思是:
手动指定浮点列并对其进行转换非常费力, 尝试通过检查哪些列是字符串类型并包含逗号来动态查找它们,避免不考虑带有毫秒分隔符的日期时间列等,在某些列上强制转换为浮点数,因为它们是包含文本的逗号,但不打算被解析为浮点数:这会导致头痛。因此,如果有多个浮点列并且你的数据集可以用pandas进行预处理,你可以应用上面的代码。
【讨论】:
【参考方案3】:确保您的 SQL 表已预先格式化为读取 NUMERIC 而不是 INTEGER。我在试图弄清楚所有关于编码以及点和逗号等的不同格式时遇到了很大的麻烦。最后,问题更加原始,它被预先格式化为只读整数,因此无论使用逗号还是点,都不会接受小数。然后我只需要更改我的 SQL 表来接受实数 (NUMERIC) 就可以了。
【讨论】:
请发布一些代码来激发您的回答。到目前为止,经过和很多人讨论,我还没有找到任何方法将European/German
格式的数字导入PySpark
。 @jhole89 提供的答案是大多数人所做的。如果您找到了一种方法,请通过发布必要的代码来证明这一点。非常感谢。
我的表看起来像这样: CREATE TABLE PUBLIC.ECOSTAT ( DEST CHAR(50), AIRLINE CHAR(50), SHP_WEIGHT INTEGER );我正在尝试使用以下格式导入 SHP_WEIGHT 的数据:122.5、2500.5、750.3(这是英文格式,点而不是逗号)。问题不在于我有一台德国服务器,而在于我如何创建表。我已将变量 SHP_WEIGHT 创建为 INTEGER,并且整数不采用十进制数。然后我修改了表格并重新创建了 SHP_WEIGHT NUMERIC(它接受像 122.5 这样的实数)并且它起作用了!
我认为你完全没有抓住重点。参考您的示例,我的数据将类似于 122,4; 2500,5; 750,3。 PySpark 中没有可以处理此问题的 DataType - spark.apache.org/docs/2.1.2/api/python/_modules/pyspark/sql/… 我们必须将其作为字符串导入 Schema 中,然后转换为正确的英式格式,然后转换为 float/int。这就是@jhole89 在他的回答中所暗示的。感谢您的努力。以上是关于PySpark:如何用逗号指定列作为十进制的主要内容,如果未能解决你的问题,请参考以下文章
Inferschema 检测列作为字符串而不是来自 pyspark 中镶木地板的双精度