使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译

Posted

技术标签:

【中文标题】使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译【英文标题】:Performing lookup/translation in a Spark RDD or data frame using another RDD/df 【发布时间】:2015-10-13 01:29:37 【问题描述】:

我很难实现一些看起来应该很容易的东西:

我的目标是使用第二个 RDD/dataframe 作为查找表或翻译字典在 RDD/dataframe 中进行翻译。我想在多个栏目中进行这些翻译。

解释问题的最简单方法是举例说明。假设我输入以下两个 RDD:

Route SourceCityID DestinationCityID
A     1            2
B     1            3
C     2            1

CityID CityName
1      London
2      Paris
3      Tokyo

我想要的输出 RDD 是:

Route SourceCity DestinationCity
A     London     Paris
B     London     Tokyo
C     Paris      London

我应该如何制作它?

这是 SQL 中的一个简单问题,但我不知道 Spark 中 RDD 的明显解决方案。 joincogroup 等方法似乎不太适合多列 RDD,并且不允许指定要加入的列。

有什么想法吗? SQLContext 是答案吗?

【问题讨论】:

使用 Dataframe 和 SparkSQL 将帮助您找到所需的内容。它基本上是具有不同语法的 sql。 表/RDD 的大小是多少? CityID/CityName RDD 是否比 Route RDD 小几倍?在这种情况下,我会将 RDD 的结果收集为地图并广播它,以便它可以在每个 Worker 上进行本地查找。 【参考方案1】:

rdd方式:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])


print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()

哪些打印:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]

以及SQLContext方式:

from pyspark.sql import HiveContext
from pyspark.sql import SQLContext

df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])

temp =  df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")

print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()

哪些打印:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]

【讨论】:

【参考方案2】:

假设我们有两个包含路线和城市的 RDD:

val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))

有几种方法可以实现城市查找。假设与包含许多项目的路线相比,城市查找包含的项目很少。在这种情况下,让我们从收集城市作为地图开始,该地图由驱动程序发送给每个任务。

val citiesByID = citiesByIDRDD.collectAsMap

routes.mapr => (r._1, citiesByID(r._2), citiesByID(r._3)).collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

为避免将查找表发送给每个任务,而只发送给工作人员一次,您可以扩展现有代码广播查找图。

val bCitiesByID = sc.broadcast(citiesByID)

routes.mapr => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3)).collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

我认为这里不需要数据框,但如果您愿意,您可以:

import sqlContext.implicits._

case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)

val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))

val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")

routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
|  A|   1|  2|
|  B|   1|  3|
|  C|   2|  1|
+---+----+---+

citiesDf.show
+---+------+
| id|  name|
+---+------+
|  1|London|
|  2| Paris|
|  3| Tokyo|
+---+------+

您提到这是 SQL 中的一个简单问题,所以我假设您可以从这里开始。执行 SQL 是这样的:

sqlContext.sql ("SELECT COUNT(*) FROM routes")

【讨论】:

要使用collects,您必须确保所有数据都适合主节点 是的,没错。该问题的示例提示使用收集和广播的此解决方案,但这仅在城市查找表与路由相比相对较小且小到足以放入执行程序/驱动程序内存时才有意义。

以上是关于使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译的主要内容,如果未能解决你的问题,请参考以下文章

spark rdd df dataset

大数据Spark Dataset

为啥 df.limit 在 Pyspark 中不断变化?

PySpark:TypeError:条件应该是字符串或列

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

spark:根据另一个 rdd 的序列加入 rdd