基于 Spark 中另一个 RDD 的过滤器
Posted
技术标签:
【中文标题】基于 Spark 中另一个 RDD 的过滤器【英文标题】:Filter based on another RDD in Spark 【发布时间】:2014-10-06 10:13:17 【问题描述】:我想只保留在第二个表中引用了部门 ID 的员工。
Employee table
LastName DepartmentID
Rafferty 31
Jones 33
Heisenberg 33
Robinson 34
Smith 34
Department table
DepartmentID
31
33
我尝试了以下代码,但不起作用:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()
Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
有什么想法吗?我正在使用带有 Python 的 Spark 1.1.0。不过,我会接受 Scala 或 Python 的答案。
【问题讨论】:
您是否要求您的部门列表是 RDD? 并非如此。部门列表是从 HDFS 加载的,但不是很大。 【参考方案1】:在这种情况下,您想要实现的是使用部门表中包含的数据在每个分区进行过滤: 这将是基本的解决方案:
val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filtercase (employee, d) => dept.contains(d)
如果您的部门数据很大,广播变量将通过一次将数据传送到所有节点来提高性能,而不必在每个任务中序列化它
val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filtercase (employee, d) => deptBC.value.contains(d)
虽然使用 join 可行,但它是一个非常昂贵的解决方案,因为它需要对数据进行分布式洗牌(byKey)来实现连接。鉴于要求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。
【讨论】:
如果我在这里错了,请原谅我,但是 partitionBy() 不会按键解决分布式洗牌吗?并不是说它会解决加入更昂贵的问题,因为我不认为它会,我只是说加入不需要 100% 的时间洗牌。【参考方案2】:我终于实现了一个使用连接的解决方案。我不得不为部门添加一个 0 值以避免 Spark 异常:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))
employee.join(department).map(lambda e: (e[1][0], e[0])).collect()
output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]
【讨论】:
【参考方案3】:过滤多列中的多个值:
如果您从数据库中提取数据(此示例为 Hive 或 SQL 类型 db)并且需要过滤多个列,使用第一个过滤器加载表可能会更容易,然后迭代您的通过 RDD 过滤(多次小迭代是 Spark 编程的鼓励方式):
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")
当然,您必须稍微了解您的数据才能筛选出正确的值,但这是分析过程的一部分。
【讨论】:
【参考方案4】:对于上面的同一个例子,我想只保留第二个表中引用的部门 ID 中包含的员工。 但它必须是没有加入操作,我会在“包含”或“在”中看到它, 我的意思是 33 是“在”334 和 335
employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
【讨论】:
以上是关于基于 Spark 中另一个 RDD 的过滤器的主要内容,如果未能解决你的问题,请参考以下文章