加入两个流水线RDD
Posted
技术标签:
【中文标题】加入两个流水线RDD【英文标题】:Join two pipelinedRDDs 【发布时间】:2020-01-01 17:08:47 【问题描述】:我正在尝试在 pyspart jupyter notebook 中使用 .join() 加入两个 pipelinedRDD
第一个 RDD:
primaryType.take(5)
['DECEPTIVE PRACTICE',
'CRIM SEXUAL ASSAULT',
'BURGLARY',
'THEFT',
'CRIM SEXUAL ASSAULT']
第二个RDD:
districts.take(5)
['004', '022', '008', '003', '001']
加入 RDD:
rdd_joined = primaryType.join(districts)
rdd_joined.take(5)
输出:
[]
我在这里做错了什么?
【问题讨论】:
你不需要pairRDDs来执行连接吗? spark.apache.org/docs/0.8.0/api/pyspark/… 好的,我想我错过了一些东西......那么预期的输出是什么?好像你正在和土豆一起犯罪:) 顺便说一句:spark.apache.org/docs/latest/api/python/… 我想我需要为 RDD 中的每个元素附加唯一键,然后加入它们? 【参考方案1】:应该有一些唯一的键来连接两个 rdds,所以使用 rdd.zipWithIndex() 为两个 rdds 创建索引,然后尝试加入它们
districts.take(5)
['004', '022', '008', '003', '001']
primaryType.take(5)
['DECEPTIVE PRACTICE',
'CRIM SEXUAL ASSAULT',
'BURGLARY',
'THEFT',
'CRIM SEXUAL ASSAULT']
districts=districts.zipWithIndex()
districts.take(5)
[('004', 0), ('022', 1), ('008', 2), ('003', 3), ('001', 4)]
districts=districts.map(lambda (x,y):(y,x))
primaryType=primaryType.zipWithIndex()
primaryType=primaryType.map(lambda (x,y):(y,x))
primaryType.join(districts).map(lambda (x,y):y).take(5)
[('DECEPTIVE PRACTICE', '004'), ('CRIM SEXUAL ASSAULT', '001'), ('CRIM SEXUAL ASSAULT', '022'), ('BURGLARY', '008'), ('THEFT', '003')]
【讨论】:
以上是关于加入两个流水线RDD的主要内容,如果未能解决你的问题,请参考以下文章