在pyspark中使用整数对列进行编码

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在pyspark中使用整数对列进行编码相关的知识,希望对你有一定的参考价值。

我必须在pyspark(spark 2.0)的大型DataFrame中对列进行编码。所有值几乎都是唯一的(约1000mln值)。最好的选择可能是StringIndexer,但由于某种原因它总是失败并杀死我的火花会话。我可以以某种方式编写这样的函数:

id_dict() = dict()
def indexer(x):
    id_dict.setdefault(x, len(id_dict))
    return id_dict[x]

并将其映射到DataFrame,id_dict保存items()?这个字典会在每个遗嘱执行人身上同步吗?我需要所有这些来预处理spark.mllib ALS模型的元组('x',3,5)。谢谢。

答案

StringIndexer将所有标签保留在内存中,因此如果值几乎是唯一的,则它不会缩放。

您可以采用唯一值,排序和添加ID,这很昂贵,但在这种情况下更加健壮:

from pyspark.sql.functions import monotonically_increasing_id

df = spark.createDataFrame(["a", "b", "c", "a", "d"], "string").toDF("value")

indexer = (df.select("value").distinct()
  .orderBy("value")
  .withColumn("label", monotonically_increasing_id()))

df.join(indexer, ["value"]).show()
# +-----+-----------+
# |value|      label|
# +-----+-----------+
# |    d|25769803776|
# |    c|17179869184|
# |    b| 8589934592|
# |    a|          0|
# |    a|          0|
# +-----+-----------+

请注意,标签不是连续的,可能因运行而异,或者如果spark.sql.shuffle.partitions发生变化,则可能会发生变化。如果不可接受,你将不得不使用RDDs

from operator import itemgetter

indexer = (df.select("value").distinct()
    .rdd.map(itemgetter(0)).zipWithIndex()
    .toDF(["value", "label"]))

df.join(indexer, ["value"]).show()
# +-----+-----+
# |value|label|
# +-----+-----+
# |    d|    0|
# |    c|    1|
# |    b|    2|
# |    a|    3|
# |    a|    3|
# +-----+-----+

以上是关于在pyspark中使用整数对列进行编码的主要内容,如果未能解决你的问题,请参考以下文章

使用 PySpark 数据框的成对列操作(例如点积)

在 Pyspark 中使用整数与十进制值进行过滤

在 Hadoop 中运行 pyspark 时不是文件异常

哈夫曼编码课程设计+最小优先对列建树。

如何在pyspark中对数组中的标签进行编码

如何在pyspark中对数组中的标签进行编码