在pyspark中用整数编码一列

Posted

技术标签:

【中文标题】在pyspark中用整数编码一列【英文标题】:Encode a column with integer in pyspark 【发布时间】:2018-05-20 14:35:22 【问题描述】:

我必须在 pyspark(spark 2.0) 的大 DataFrame 中对列进行编码。所有值几乎都是唯一的(大约 10 亿个值)。 最好的选择可能是 StringIndexer,但由于某种原因,它总是失败并终止我的 spark 会话。 我可以以某种方式编写这样的函数吗:

id_dict() = dict()
def indexer(x):
    id_dict.setdefault(x, len(id_dict))
    return id_dict[x]

并使用 id_dict 将其映射到 DataFrame 以保存 items()?这个字典会在每个执行者上同步吗? 我需要所有这些来预处理 spark.mllib ALS 模型的元组 ('x', 3, 5)。 谢谢你。

【问题讨论】:

【参考方案1】:

StringIndexer 将所有标签保存在内存中,因此如果值几乎是唯一的,它就不会缩放。

您可以获取唯一值、排序和添加 id,这很昂贵,但在这种情况下更健壮:

from pyspark.sql.functions import monotonically_increasing_id

df = spark.createDataFrame(["a", "b", "c", "a", "d"], "string").toDF("value")

indexer = (df.select("value").distinct()
  .orderBy("value")
  .withColumn("label", monotonically_increasing_id()))

df.join(indexer, ["value"]).show()
# +-----+-----------+
# |value|      label|
# +-----+-----------+
# |    d|25769803776|
# |    c|17179869184|
# |    b| 8589934592|
# |    a|          0|
# |    a|          0|
# +-----+-----------+

请注意,标签不是连续的,并且可能因运行而异,或者如果spark.sql.shuffle.partitions 更改,则可能会更改。如果不可接受,您将不得不使用RDDs

from operator import itemgetter

indexer = (df.select("value").distinct()
    .rdd.map(itemgetter(0)).zipWithIndex()
    .toDF(["value", "label"]))

df.join(indexer, ["value"]).show()
# +-----+-----+
# |value|label|
# +-----+-----+
# |    d|    0|
# |    c|    1|
# |    b|    2|
# |    a|    3|
# |    a|    3|
# +-----+-----+

【讨论】:

为什么在选择不同的之后需要对它们进行排序? 因为distinct 不保证任何特定的顺序。因此,如果重新计算数据,标签可能会有所不同,即使在同一个应用程序中也是如此。这可能会在您的应用程序中导致一些相当意外的问题。排序确保结果不会改变,除非配置(分区数改变)。 感谢 monotonically_increasing_id() 的选项效果很好。但是在使用站点名称(字符串类型)加入列后,值会发生变化(例如 - “murmanout.ru”变为“u'\x047\x19\x17\u0410\u0422\u0448\x1f\u041c\u2014_'”。如何我可以防止这种行为吗? 看起来像是在调用一些工人端 Python 代码(与这个问题无关)。

以上是关于在pyspark中用整数编码一列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 根据另一列值的降序添加递增的整数排名值

在pyspark中使用整数对列进行编码

PySpark 传递列表到用户定义函数

在pyspark中用平均值填充缺失值

如何在 PySpark 的分组对象中插入一列?

如何在 PySpark 上将所有功能组合成一列?