用于交替值组的 Pyspark 自动增量
Posted
技术标签:
【中文标题】用于交替值组的 Pyspark 自动增量【英文标题】:Pyspark autoincrement for alternating group of values 【发布时间】:2017-10-06 12:46:11 【问题描述】:我正在尝试使用 Pyspark 在 Spark DataFrame 中创建一个新列,它表示基于交替布尔值组的自动增量(或 ID)。假设我有以下 DataFrame:
df.show()
+-----+------------+-------------+
|id |par_id |is_on |
+-----+------------+-------------+
|40002|1 |true |
|40003|2 |true |
|40004|null |false |
|40005|17 |true |
|40006|2 |true |
|40007|17 |true |
|40008|240 |true |
|40009|1861 |true |
|40010|1862 |true |
|40011|2 |true |
|40012|null |false |
|40013|1863 |true |
|40014|626 |true |
|40016|208 |true |
|40017|2 |true |
|40018|null |false |
|40019|2 |true |
|40020|1863 |true |
|40021|2 |true |
|40022|2 |true |
+-----+------------+-------------+
我想使用is_on
属性使用称为id2
的增量ID 扩展此DataFrame。也就是说,每组布尔值都应该得到一个递增的 id。生成的 DataFrame 应如下所示:
df.show()
+-----+------------+-------------+-----+
|id |par_id |is_on |id2 |
+-----+------------+-------------+-----+
|40002|1 |true |1 |
|40003|2 |true |1 |
|40004|null |false |2 |
|40005|17 |true |3 |
|40006|2 |true |3 |
|40007|17 |true |3 |
|40008|240 |true |3 |
|40009|1861 |true |3 |
|40010|1862 |true |3 |
|40011|2 |true |3 |
|40012|null |false |4 |
|40013|1863 |true |5 |
|40014|626 |true |5 |
|40016|208 |true |5 |
|40017|2 |true |5 |
|40018|null |false |6 |
|40019|2 |true |7 |
|40020|1863 |true |7 |
|40021|2 |true |7 |
|40022|2 |true |7 |
+-----+------------+-------------+-----+
您对此有什么建议吗?如何为此编写用户定义函数?
【问题讨论】:
【参考方案1】: #this is python spark testing file
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, struct
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.master("local").appName("durga prasad").config("spark.sql.warehouse.dir","/home/hadoop/spark-2.0.1-bin-hadoop2.7/bin/test_warehouse").getOrCreate()
df=spark.read.csv("/home/hadoop/stack_test.txt",sep=",",header=True)
# This is udf
count=1 # these variable is changed based on function call
prStr='' # these variable is changed based on function call
def test_fun(str):
global count
global prStr
if str=="false":
count=count + 1
prStr=str
return count
if str=="true" and prStr =='false':
count=count + 1
prStr=str
return count
elif str=='true':
count=count
prStr=str
return count
# udf function end
testUDF = udf(test_fun, StringType()) # register udf
df.select("id","par_id","is_on",testUDF('is_on').alias("id2")).show()
####output
+-----+------+-----+---+
| id|par_id|is_on|id2|
+-----+------+-----+---+
|40002| 1| true| 1|
|40003| 2| true| 1|
|40004| null|false| 2|
|40005| 17| true| 3|
|40006| 2| true| 3|
|40007| 17| true| 3|
|40008| 240| true| 3|
|40009| 1861| true| 3|
|40010| 1862| true| 3|
|40011| 2| true| 3|
|40012| null|false| 4|
|40013| 1863| true| 5|
|40014| 626| true| 5|
|40016| 208| true| 5|
|40017| 2| true| 5|
|40018| null|false| 6|
|40019| 2| true| 7|
|40020| 1863| true| 7|
|40021| 2| true| 7|
|40022| 2| true| 7|
+-----+------+-----+---+
【讨论】:
以上是关于用于交替值组的 Pyspark 自动增量的主要内容,如果未能解决你的问题,请参考以下文章