PySpark在嵌套数组中反转StringIndexer

Posted

技术标签:

【中文标题】PySpark在嵌套数组中反转StringIndexer【英文标题】:PySpark reversing StringIndexer in nested array 【发布时间】:2017-08-20 22:26:47 【问题描述】:

我正在使用 PySpark 使用 ALS 进行协同过滤。我的原始用户和项目 ID 是字符串,所以我使用 StringIndexer 将它们转换为数字索引(PySpark 的 ALS 模型要求我们这样做)。

在我拟合模型后,我可以获得每个用户的前 3 条建议,如下所示:

recs = (
    model
    .recommendForAllUsers(3)
)

recs 数据框如下所示:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

我想用这个数据框创建一个巨大的 JSOM 转储,我可以这样:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

这些 json 的示例是:


  "userIdIndex": 1580,
  "recommendations": [
    
      "productIdIndex": 10096,
      "rating": 3.6725707
    ,
    
      "productIdIndex": 10141,
      "rating": 3.61542
    ,
    
      "productIdIndex": 11591,
      "rating": 3.536216
    
  ]

userIdIndexproductIdIndex 键是由 StringIndexer 转换产生的。

我怎样才能恢复这些列的原始值?我怀疑我必须使用IndexToString 转换器,但由于数据嵌套在recs 数据框内的数组中,所以我不太清楚。

我尝试使用 Pipeline 评估器 (stages=[StringIndexer, ALS, IndexToString]),但该评估器似乎不支持这些索引器。

干杯!

【问题讨论】:

【参考方案1】:

在这两种情况下,您都需要访问标签列表。这可以使用StringIndexerModel

访问
user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labels

product_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

或列元数据。

对于userIdIndex,您只需申请IndexToString

from pyspark.ml.feature import IndexToString

user_id_to_label = IndexToString(
    inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)

对于建议,您需要udf 或这样的表达式:

from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)

【讨论】:

太棒了!工作:)【参考方案2】:

至少在我的情况下,作为性能问题给出的答案花费了太长时间。 你可以使用IndexToString 我提供了一个简单的代码sn-p(假设用户和产品有两个StringIndexer


from pyspark.ml.feature import StringIndexer, IndexToString
idx_to_user = IndexToString(inputCol='userIdIndex',outputCol='user_id').setLabels(self.user_indexer.labels)
idx_to_prod = IndexToString(inputCol='productIdIndex',outputCol='product_id').setLabels(self.prod_indexer.labels)

recoms = idx_to_user.transform(recs)
res = self.idx_to_prod.transform(recoms.select(F.col('user_id'),F.explode('recommendations')).select('user_id','col.productIdIndex','col.rating'))
result = res.select('user_id','product_id','rating')


【讨论】:

以上是关于PySpark在嵌套数组中反转StringIndexer的主要内容,如果未能解决你的问题,请参考以下文章

当数据包含具有两种不同数据类型的嵌套数组时,在 PySpark 中定义模式

如何从aws glue pyspark作业中的嵌套数组中提取数据

在 PySpark 中反转 Group By

PySpark 嵌套数据框

在 Pyspark 代码中读取嵌套的 Json 文件。 pyspark.sql.utils.AnalysisException:

在pyspark中展平嵌套的json scala代码