如何在 Spark SQL(PySpark) 中实现自增
Posted
技术标签:
【中文标题】如何在 Spark SQL(PySpark) 中实现自增【英文标题】:How to implement auto increment in spark SQL(PySpark) 【发布时间】:2016-10-25 04:20:43 【问题描述】:我需要在我的 spark sql 表中实现一个自动增量列,我该怎么做。请指导我。我正在使用 pyspark 2.0
谢谢 卡利安
【问题讨论】:
查看***.com/questions/31955309/… @MRSrinivas 感谢您的详细回复我会试试的,最近我尝试从 pyspark.sql.functions import monotonically_increasing_id 解决它已经工作的问题。它为从 0 开始索引的每一行提供 id,非常感谢 【参考方案1】:我会编写/重用 stateful Hive udf 并向 pySpark 注册,因为 Spark SQL 确实对 Hive 有很好的支持。
在下面的代码中检查这一行 @UDFType(deterministic = false, stateful = true)
以确保它是有状态的 UDF。
package org.apache.hadoop.hive.contrib.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
/**
* UDFRowSequence.
*/
@Description(name = "row_sequence",
value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
private LongWritable result = new LongWritable();
public UDFRowSequence()
result.set(0);
public LongWritable evaluate()
result.set(result.get() + 1);
return result;
// End UDFRowSequence.java
现在构建 jar 并在 pyspark 启动时添加位置。
$ pyspark --jars your_jar_name.jar
然后注册sqlContext
。
sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")
现在在选择查询中使用row_seq()
sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")
Project to use Hive UDFs in pySpark
【讨论】:
我已经按照您指定的方式构建了 jar,并且还创建了临时函数。现在我创建了一个表sqlContext.sql("Create table abc(id int, name string)")
和sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'John'")
和sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'Tim'")
。当我选择 * 语句时,我同时得到 iD
作为 1
而不是 1
和 2
。
您的代码中是否在标签@UDFType
内设置了stateful = true
?
我需要这样的东西,但问题是,它是否可以扩展 2 亿的数据。实际上,我想将包含 2 亿行的大文件分解为包含文件的确切 10K 行的较小文件。我想为每一行添加自动递增数,并在这样的帮助下批量读取(id > 10,001 和 id
是否可以在 python 中执行此 UDF?并在 sqlContext 中注册?以上是关于如何在 Spark SQL(PySpark) 中实现自增的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 jupyter notebook 在 pyspark 中的 Hive 上使用 %sql Magic 字符串启用 spark SQL
如何在 Apache Spark (pyspark) 中使用自定义类?