如何用来自另一个 DataFrame 的匹配 id 替换单词(在一个 DataFrame 中)?

Posted

技术标签:

【中文标题】如何用来自另一个 DataFrame 的匹配 id 替换单词(在一个 DataFrame 中)?【英文标题】:How to replace words (in one DataFrame) with matching ids from another DataFrame? 【发布时间】:2017-05-25 15:16:37 【问题描述】:

询问this question 的变体,它被要求用于 Pandas,我有类似的情况,除了我正在使用 spark-shellpyspark

我有一个包含域(顶点)列表的数据框:

index            domain
0            airbnb.com
1          facebook.com
2                st.org
3              index.co
4        crunchbase.com
5               avc.com
6        techcrunch.com
7            google.com

我有另一个数据框,其中包含这些域(边缘)之间的连接:

           source_domain    destination_domain
              airbnb.com            google.com
            facebook.com            google.com
                  st.org          facebook.com
                  st.org            airbnb.com
                  st.org        crunchbase.com
                index.co        techcrunch.com
          crunchbase.com        techcrunch.com
          crunchbase.com            airbnb.com
                 avc.com        techcrunch.com
          techcrunch.com                st.org
          techcrunch.com            google.com
          techcrunch.com          facebook.com

如何将边缘数据帧中的每个单元格替换为域(也称为顶点)数据帧中的相应索引?因此,edges 数据框中的第一行可能最终看起来像:

###### Before: ##################### 
           facebook.com google.com   
###### After:  #####################   
           1            7

数据帧将增长到至少几百 GB。

如何在 Spark 中执行此操作?

【问题讨论】:

【参考方案1】:

TL;DR 将数据集保存为 CSV 文件,分别为 vertices.csvedges.csvreadjoin

// load the datasets
val vertices = spark.read.option("header", true).csv("vertices.csv")
val edges = spark.read.option("header", true).csv("edges.csv")

// indexify the source_domain
val sources = edges.
  join(vertices).
  where(edges("source_domain") === vertices("domain")).
  withColumnRenamed("index", "source_index")

// indexify the destination_domain
val destinations = edges.
  join(vertices).
  where(edges("destination_domain") === vertices("domain")).
  withColumnRenamed("index", "destination_index")

val result = sources.
  join(destinations, Seq("source_domain", "destination_domain")).
  select("source_index", "destination_index")
scala> result.show
+------------+-----------------+
|source_index|destination_index|
+------------+-----------------+
|           0|                7|
|           1|                7|
|           2|                1|
|           2|                0|
|           2|                4|
|           3|                6|
|           4|                6|
|           4|                0|
|           5|                6|
|           6|                2|
|           6|                7|
|           6|                1|
+------------+-----------------+

【讨论】:

以上是关于如何用来自另一个 DataFrame 的匹配 id 替换单词(在一个 DataFrame 中)?的主要内容,如果未能解决你的问题,请参考以下文章

如何用另一个表中的匹配值替换/更新列中每个字符串的所有实例?

如何用空格匹配、计数和替换字符串,但它们不是另一个字母字符串的子字符串?

PySpark - 如何使用连接更新 Dataframe?

Meteor:你如何用 _id 字段将一个集合中的字段填充到另一个集合中?

Meteor:你如何用 _id 字段将一个集合中的字段填充到另一个集合中?

如何用另一个键替换散列键