基于pyspark中的键有效地推断数据帧模式

Posted

技术标签:

【中文标题】基于pyspark中的键有效地推断数据帧模式【英文标题】:Efficiently inferring dataframe schema based on key in pyspark 【发布时间】:2016-02-20 18:16:59 【问题描述】:

我有大量的 json 行(行)数据集。这些行有几个字段,存在的字段取决于该行中的一个 json 字段。这是一个小例子:

%pyspark
data = sc.parallelize(['key':'k1','a':1.0,'b':2.0,
                    'key':'k1','a':1.0,'b':20.0,
                    'key':'k1','a':100.0,'b':.2,
                    'key':'k2','y':10.0,'z':20.0,
                    'key':'k2','y':1.0,'z':250.0,
                    'key':'k1','a':1.0,'b':2.0,], 2)

我的目标是将这些数据放入 Dataframe 中,而无需指定架构。 Pyspark 有(至少)两个函数来帮助解决这个问题:1)toDF(),它只将第一行数据作为模式和 2)sqlContext.createDataFrame(),您可以在其中指定要采样的行的比例以便推断架构。例如:

data.toDF().show()
+-----+----+---+
|    a|   b|key|
+-----+----+---+
|  1.0| 2.0| k1|
|  1.0|20.0| k1|
|100.0| 0.2| k1|
| null|null| k2|
| null|null| k2|
|  1.0| 2.0| k1|
+-----+----+---+

sqlContext.createDataFrame(data,samplingRatio=1).show()
+-----+----+---+----+-----+
|    a|   b|key|   y|    z|
+-----+----+---+----+-----+
|  1.0| 2.0| k1|null| null|
|  1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
|  1.0| 2.0| k1|null| null|
+-----+----+---+----+-----+

sqlContext.createDataFrame() 做我想做的事,但由于我在 40 亿行中可能只有五个键,我认为必须有一种更快的方法来推断模式。另外,有些键非常稀有,所以我无法将samplingRatio 变小。

鉴于只有少数行类型,是否有一种优雅而快速的方式来推断架构?

【问题讨论】:

首先,使用字典推断模式在 1.3.0 中已被弃用,因此它确实不是一个好的选择。通常,如果您不想自己提供架构,那么您不能做得比全面扫描更好。想想看——即使你检查了 N-1 个元素,也不能保证第 N 个元素不会包含额外的字段。 很高兴了解字典模式推断弃用。鉴于此,什么是更好的方法?我发布了一个答案,该答案使用了基于 key 键值的唯一模式的辅助信息,这似乎是加快速度的诀窍。 如果输入真的是 JSON,那么直接将数据传递给 JSON 阅读器,而不需要从 Python 进行管道传输。或传递架构。它不仅可以作为性能提升器,还可以作为验证。 如果是这样,我怀疑您通常可以做得更好,特别是如果您有非常稀有的钥匙。基本上你能做的任何事情都是概率性的,10e-9 左右的频率使它相当绝望。不过,如果您预处理这些数据,为什么不同时推断模式呢?一种或另一种你触及所有领域的方式,对吧? 好问题,这可能是最好的方法。我试图尽量减少我必须编写的代码量。也许我可以深入研究 pyspark 使用的模式推理功能并利用这些功能。我会检查一下;谢谢指点。 【参考方案1】:

更多的谷歌搜索引导我找到解决方案。

首先创建一个强大的数据框连接器(unionAll 无法合并架构):

def addEmptyColumns(df, colNames):
    exprs = df.columns + ["null as " + colName for colName in colNames]
    return df.selectExpr(*exprs)


def concatTwoDfs(left, right):
    # append columns from right df to left df
    missingColumnsLeft = set(right.columns) - set(left.columns)
    left = addEmptyColumns(left, missingColumnsLeft)

    # append columns from left df to right df
    missingColumnsRight = set(left.columns) - set(right.columns)
    right = addEmptyColumns(right, missingColumnsRight)

    # let's set the same order of columns
    right = right[left.columns]

     # finally, union them
    return left.unionAll(right)


def concat(dfs):
    return reduce(concatTwoDfs, dfs)

(来自https://lab.getbase.com/pandarize-spark-dataframes/的代码)

然后获取不同的键,制作数据帧列表,并将它们连接起来:

keys = data.map(lambda x: x['key']).distinct().collect()

a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys]

concat(a_grp).show()

+-----+----+---+----+-----+
|    a|   b|key|   y|    z|
+-----+----+---+----+-----+
|  1.0| 2.0| k1|null| null|
|  1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
|  1.0| 2.0| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
+-----+----+---+----+-----+

【讨论】:

以上是关于基于pyspark中的键有效地推断数据帧模式的主要内容,如果未能解决你的问题,请参考以下文章

在 pandas 数据帧上应用 Pyspark 管道

pyspark如何有效地进行这种转换? [重复]

基于 Pyspark 中的键加入 RDD

如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?

Pyspark:基于所有列减去/差异 pyspark 数据帧

如何有效地将 PySpark 数据框中的行相乘?