向 pyspark 数据框添加行索引(并排添加新列/连接数据框)
Posted
技术标签:
【中文标题】向 pyspark 数据框添加行索引(并排添加新列/连接数据框)【英文标题】:Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side) 【发布时间】:2019-03-26 22:42:40 【问题描述】:我试图并排连接两个数据帧。我看到了this。在monotonically_increasing_id() 的描述中写道:
"monotonically_increasing_id() - 返回单调递增的 64 位整数。生成的 ID 保证单调递增且唯一,但不连续。当前实现将分区 ID 放在高 31 位,而低 33 位表示每个分区内的记录数。假设数据帧的分区少于 10 亿,每个分区的记录少于 80 亿。该函数是非确定性的,因为它的结果取决于分区 ID。
我试图了解我们如何假设 monotonically_increasing_id() 会为这两个加入的数据帧产生相同的结果,因为它是不确定的。如果它为这些数据帧生成不同的 row_numbers,那么它们就不会加入。 “结果取决于分区 ID”部分可能是答案,但我不明白这一点。谁能解释一下?
【问题讨论】:
【参考方案1】:这是迄今为止我发现的将索引添加到数据框 df
的最佳方法:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df\
.rdd\
.zipWithIndex()\
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
它确实有转换为rdd
然后返回数据帧的开销。但是,monotonically_increasing_id()
是不确定的,row_number()
需要一个Window
,除非与PARTITION BY
一起使用,否则它可能并不理想,否则它将所有数据洗牌到一个分区,defeating pyspark 的目的。
因此,要将列表添加为数据框中的新列,只需将列表转换为数据框
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
并像上面一样添加 row_number 。然后加入,
joined_df = df.join(new_df, ['row_idx'], 'inner')
【讨论】:
【参考方案2】:这是我的经验。 monotonically_increasing_id() 有点粗糙。对于小型用例,您将始终获得一个普遍增加的 ID。但是,如果您遇到复杂的洗牌或数据使用问题,它可以并且不会在每个滴答时以相同的值增加。我的意思是 DF1 从 1->~100000000 但是在重新洗牌期间 DF2 再次从 Spark 惰性实现重新计算,它从 1->~48000000 然后 48000001.23->100000000.23。这意味着我丢失了很多行。
我解决问题的方法是通过唯一的 Row_ID。为此,我在下面有一个名为 Row_Hash 的函数,它将遍历并在列的前面构建一个唯一的行 ID。无论有多少次洗牌或数据写入,我都保持了连接条件的唯一性。
编辑:我要做的是将数据框元数据的所有元素转换为数组。这样做的原因是您可以指定要查询的数组的哪些元素。这与数据帧不同,因为洗牌和重新分区,调用 take(n) 可能会产生不同的结果,但调用 array(n) 将始终输出相同的结果。
考虑到这一点,让我们回到我们需要在没有本地行标识符的地方创建一个本地行标识符的问题。为此,我们将行完全连接(这是针对没有行键的情况),在产品顶部调用 MD5(是的,有交叉的机会,但它非常低)。这将为每一行生成一个大字符串字符,使其与系统的其余部分分开,允许用户将其用作唯一的行连接键。
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
【讨论】:
你能在pyspark中提供解决方案吗?我对斯卡拉不是很熟悉。比如我理解循环本身,但是 selectExpr 是做什么的呢? 还有,piiIterator
是什么?
当然!也完全忘记了一个连接。让我一到办公室就快速添加 cmets。以上是关于向 pyspark 数据框添加行索引(并排添加新列/连接数据框)的主要内容,如果未能解决你的问题,请参考以下文章