spark 2.x 正在使用 csv 函数将整数/双列作为字符串读取

Posted

技术标签:

【中文标题】spark 2.x 正在使用 csv 函数将整数/双列作为字符串读取【英文标题】:spark 2.x is reading integer/double column as string using csv function 【发布时间】:2017-08-31 09:34:39 【问题描述】:

我正在使用以下语句在 spark 中读取 csv。

df = spark.read.csv('<CSV FILE>', header=True, inferSchema = True)

当我在 spark 数据帧中检查时,一些整数和双列在数据帧中存储为字符串列。但是,并非所有列都如此。

我检查了特定列的值,所有值都是双精度类型,但 spark 仍然推断为 StringType。

由于我正在加载包含大约 1000 列的 CSV 文件,因此也无法明确指定架构。

任何建议/帮助将不胜感激。

问候,

尼拉杰

【问题讨论】:

您可以稍后将其转换为所需的数据类型。 df.withColumn("a", col("a").cast(DecimalType(10,2) ) 或其他。 请添加未正确解析的示例数据 我可以显式地转换列,但为了做到这一点,我需要将 1000 列与它们在 spark 中的实际数据类型和数据类型进行比较。但是,我不想做这个练习,否则我需要一次又一次地做这个耗时的练习。我无法在此处附加示例数据。但是,我已经检查了包含它作为字符串的所有双精度值的列。我的 double 值的值最多为小数点后 6 位。这会是个问题吗?如果是,我们如何在 spark 中读取 csv 时指定十进制数。 【参考方案1】:

考虑到架构不会更改中间表(浮动将保持浮动通过行)。 您可以编写一个小脚本来自动投射它们:

def isfloat(x):
    try:
        float(x)
    except :
        return False
    else:
        return True
line1 = df.head(1)[0]

df = df.select([c for c in df.columns if not isfloat(line1[c])] + [df[c].cast("float").alias(c) for c in df.columns if isfloat(line1[c])])

如果你觉得第一行信息不够,可以这样做

N = 10
def isfloat(c):
    try:
        [float(x) in c]
    except :
        return False
    else:
        return True
Nlines = df.limit(N).toPandas()
df = df.select([c for c in df.columns if not isfloat(Nlines[c])] + [df[c].cast("float").alias(c) for c in df.columns if isfloat(Nlines[c])])

【讨论】:

如果第一行有空怎么办? :) 或者第 1-10 行是浮点型的,但 20-30 是随机字符串? 他说问题不在于列中数据类型的变化,这就是我只取第一行的原因。这也是 spark 所做的,它从前 N 行推断模式,如果所有这些值都是 NULL,则可能导致错误。您始终可以修改函数以循环前 N 行,以防第一个值为 NULL。

以上是关于spark 2.x 正在使用 csv 函数将整数/双列作为字符串读取的主要内容,如果未能解决你的问题,请参考以下文章

使用跳过行在 Spark 中读取 csv

将包含 Vector 作为特征的 Spark 数据帧转换为 CSV 文件

Apache Spark 2.0 - date_add 函数

Spark CSV读取 忽略字符

使用 python Spark 将大型 CSV 发送到 Kafka

当函数在具有自动检测模式的 spark 数据帧中不起作用时