基于 Spark 中另一个 RDD 的过滤器

Posted

技术标签:

【中文标题】基于 Spark 中另一个 RDD 的过滤器【英文标题】:Filter based on another RDD in Spark 【发布时间】:2014-10-06 10:13:17 【问题描述】:

我想只保留在第二个表中引用了部门 ID 的员工。

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

我尝试了以下代码,但不起作用:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

有什么想法吗?我正在使用带有 Python 的 Spark 1.1.0。不过,我会接受 Scala 或 Python 的答案。

【问题讨论】:

您是否要求您的部门列表是 RDD? 并非如此。部门列表是从 HDFS 加载的,但不是很大。 【参考方案1】:

在这种情况下,您想要实现的是使用部门表中包含的数据在每个分区进行过滤: 这将是基本的解决方案:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filtercase (employee, d) => dept.contains(d)

如果您的部门数据很大,广播变量将通过一次将数据传送到所有节点来提高性能,而不必在每个任务中序列化它

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filtercase (employee, d) => deptBC.value.contains(d)

虽然使用 join 可行,但它是一个非常昂贵的解决方案,因为它需要对数据进行分布式洗牌(byKey)来实现连接。鉴于要求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

【讨论】:

如果我在这里错了,请原谅我,但是 partitionBy() 不会按键解决分布式洗牌吗?并不是说它会解决加入更昂贵的问题,因为我不认为它会,我只是说加入不需要 100% 的时间洗牌。【参考方案2】:

我终于实现了一个使用连接的解决方案。我不得不为部门添加一个 0 值以避免 Spark 异常:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]

【讨论】:

【参考方案3】:

过滤多列中的多个值:

如果您从数据库中提取数据(此示例为 Hive 或 SQL 类型 db)并且需要过滤多个列,使用第一个过滤器加载表可能会更容易,然后迭代您的通过 RDD 过滤(多次小迭代是 Spark 编程的鼓励方式):


    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")


当然,您必须稍微了解您的数据才能筛选出正确的值,但这是分析过程的一部分。

【讨论】:

【参考方案4】:

对于上面的同一个例子,我想只保留第二个表中引用的部门 ID 中包含的员工。 但它必须是没有加入操作,我会在“包含”或“在”中看到它, 我的意思是 33 是​​“在”334 和 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)

【讨论】:

以上是关于基于 Spark 中另一个 RDD 的过滤器的主要内容,如果未能解决你的问题,请参考以下文章

基于Scala中另一列的值映射RDD列

Spark RDD数据过滤

过滤计数等于输入文件 rdd Spark 的列

在 spark rdd 中过滤索引

Spark Streaming:如何定期刷新缓存的 RDD?

在 RDD 的过滤器转换中没有得到预期的结果