加入两个流水线RDD

Posted

技术标签:

【中文标题】加入两个流水线RDD【英文标题】:Join two pipelinedRDDs 【发布时间】:2020-01-01 17:08:47 【问题描述】:

我正在尝试在 pyspart jupyter notebook 中使用 .join() 加入两个 pipelinedRDD

第一个 RDD:

primaryType.take(5)

['DECEPTIVE PRACTICE',
 'CRIM SEXUAL ASSAULT',
 'BURGLARY',
 'THEFT',
 'CRIM SEXUAL ASSAULT']

第二个RDD:

districts.take(5)
['004', '022', '008', '003', '001']

加入 RDD:

rdd_joined = primaryType.join(districts)
rdd_joined.take(5)

输出:

[]

我在这里做错了什么?

【问题讨论】:

你不需要pairRDDs来执行连接吗? spark.apache.org/docs/0.8.0/api/pyspark/… 好的,我想我错过了一些东西......那么预期的输出是什么?好像你正在和土豆一起犯罪:) 顺便说一句:spark.apache.org/docs/latest/api/python/… 我想我需要为 RDD 中的每个元素附加唯一键,然后加入它们? 【参考方案1】:

应该有一些唯一的键来连接两个 rdds,所以使用 rdd.zipWithIndex() 为两个 rdds 创建索引,然后尝试加入它们

districts.take(5)
['004', '022', '008', '003', '001']

primaryType.take(5)

['DECEPTIVE PRACTICE',
 'CRIM SEXUAL ASSAULT',
 'BURGLARY',
 'THEFT',
 'CRIM SEXUAL ASSAULT']

districts=districts.zipWithIndex()

districts.take(5)
[('004', 0), ('022', 1), ('008', 2), ('003', 3), ('001', 4)]

districts=districts.map(lambda (x,y):(y,x))

primaryType=primaryType.zipWithIndex()
primaryType=primaryType.map(lambda (x,y):(y,x))
primaryType.join(districts).map(lambda (x,y):y).take(5)

[('DECEPTIVE PRACTICE', '004'), ('CRIM SEXUAL ASSAULT', '001'), ('CRIM SEXUAL ASSAULT', '022'), ('BURGLARY', '008'), ('THEFT', '003')]

【讨论】:

以上是关于加入两个流水线RDD的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中将流水线 RDD 转换为 Dataframe [重复]

新建项目加入到生成流水线中

Spark宽依赖窄依赖

突破硬件瓶颈:旧时代的遗珠——并行流水线架构

持续集成与持续部署宝典Part 4:创建持续部署流水线

持续集成工具篇:Jenkins 与流水线管理转