Spark DataFrame 的通用“reduceBy”或“groupBy + aggregate”功能

Posted

技术标签:

【中文标题】Spark DataFrame 的通用“reduceBy”或“groupBy + aggregate”功能【英文标题】:Generic "reduceBy" or "groupBy + aggregate" functionality with Spark DataFrame 【发布时间】:2015-12-24 18:27:14 【问题描述】:

代码审查问题:Generic “reduceBy” or “groupBy + aggregate” functionality with Spark DataFrame


大家好。也许我在这里完全重新发明了***,或者我发明了一些有用的东西。你们有人能告诉我是否有更好的方法吗?这是我正在尝试做的事情:

我想要一个通用的 reduceBy 函数,它像 RDD 的 reduceByKey 一样工作,但可以让我使用 Spark DataFrame 中的任何列。您可能会说我们已经有了它,它被称为 groupBy,但据我所知,groupBy 只允许您使用一些非常有限的选项进行聚合。我想分组,然后运行任意函数进行聚合。有人已经这样做了吗?

基本上,我正在使用一个看起来像这样的 Spark DataFrame...

+----------+---------+-----+-------------+------------+-------------------+
| birthdate|favecolor| name|twitterhandle|facebookpage|           favesong|
+----------+---------+-----+-------------+------------+-------------------+
|2000-01-01|     blue|Alice|     allyblue|        null|               null|
|1999-12-31|     null|  Bob|         null|      BobbyG| Gangsters Paradise|
|      null|     null|Alice|         null|        null|Rolling in the Deep|
+----------+---------+-----+-------------+------------+-------------------+

...并减少列“名称”以获得此:

+----------+---------+-------------------+-----+-------------+------------+
| birthdate|favecolor|           favesong| name|twitterhandle|facebookpage|
+----------+---------+-------------------+-----+-------------+------------+
|2000-01-01|     blue|Rolling in the Deep|Alice|     allyblue|        null|
|1999-12-31|     null| Gangsters Paradise|  Bob|         null|      BobbyG|
+----------+---------+-------------------+-----+-------------+------------+

我刚刚注意到列顺序的变化。我想我可以通过在开始之前记下架构来很快解决这个问题。但无论如何,我必须编写大量代码才能使其正常工作,而这似乎是一个如此简单的操作,其他人现在应该已经完成​​了。

这是使用 Python 3.5.1 和 Spark 1.5.2 编写的代码:

 def addEmptyColumns(df, colNames):
     """
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param df: 
     :param colNames: 
     :return:
     """
     exprs = df.columns + ["null as " + colName for colName in colNames]
     return df.selectExpr(*exprs)


 def concatTwoDfs(left, right):
     """
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param left: 
     :param right: 
     :return:
     """
     # append columns from right df to left df
     missingColumnsLeft = set(right.columns) - set(left.columns)
     left = addEmptyColumns(left, missingColumnsLeft)

     # append columns from left df to right df
     missingColumnsRight = set(left.columns) - set(right.columns)
     right = addEmptyColumns(right, missingColumnsRight)

     # let's set the same order of columns
     right = right[left.columns]

      # finally, union them
     return left.unionAll(right)


 def reduce(function, iterable, initializer=None):
     """
     A copy of the rough code from Python 2's reduce function documentation.  Why did Python 3 get rid of it?

     Apply function of two arguments cumulatively to the items of iterable, from left to right, so as to reduce the
     iterable to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5).
     The left argument, x, is the accumulated value and the right argument, y, is the update value from the iterable.
     If the optional initializer is present, it is placed before the items of the iterable in the calculation, and
     serves as a default when the iterable is empty. If initializer is not given and iterable contains only one item,
     the first item is returned.

     :param function: use this function to reduce the elements of iterable
     :param iterable:
     :param initializer:
     :return:
     """
     it = iter(iterable)
     if initializer is None:
         try:
             initializer = next(it)
         except StopIteration:
             raise TypeError('reduce() of empty sequence with no initial value')
     accum_value = initializer
     for x in it:
         accum_value = function(accum_value, x)
     return accum_value


 def concat(dfs):
     """
     Concatenates two Spark dataframes intelligently, adding missing columns with 'null' entry where appropriate.
     https://lab.getbase.com/pandarize-spark-dataframes/

     :param dfs: a list or tuple of two Spark dataframes
     :return: single dataframe consisting of dfs' columns and data
     """
     return reduce(concatTwoDfs, dfs)


 def combine_rows(row1, row2):
     """
     Takes two rows assumed to have the same columns, combines them, using values from row1 when available, from row2
     otherwise.

     :param row1: pyspark.sql.Row
     :param row2: pyspark.sql.Row
     :return: pyspark.sql.Row combined from row1 and row2
     """
     from pyspark.sql import Row
     combined = 
     for col in row1.asDict():
         if row1.asDict()[col] is not None:
             combined[col] = row1.asDict()[col]
         else:
             combined[col] = row2.asDict()[col]
     return Row(**combined)


 def remove_nones(row):
     """
     Takes in a row, returns that same row minus all of the columns that have a None entry.  This is required in
     order to create a new DataFrame using only this row; DataFrame will not be created if it doesn't know what kind
     of value to expect in a column.

     :param row:
     :return:
     """
     from pyspark.sql import Row
     cleaned = 
     for col in row.asDict():
         if row.asDict()[col] is not None:
             cleaned[col] = row.asDict()[col]
     return Row(**cleaned)


 def reduce_by(df, col, func):
     """
     Does pretty much the same thing as an RDD's reduceByKey, but much more generic.  Kind of like a Spark DataFrame's
     groupBy, but lets you aggregate by any generic function.

     :param df: the DataFrame to be reduced
     :param col: the column you want to use for grouping in df
     :param func: the function you will use to reduce df
     :return: a reduced DataFrame
     """
     first_loop = True
     unique_entries = df.select(col).distinct().collect()
     return_rdd = sc.parallelize([])
     for entry in unique_entries:
         if first_loop:
             return_df = sqlContext.createDataFrame( \
                                 sc.parallelize([remove_nones(df.filter(df[col] == entry[0]).rdd.reduce(func))]))
             first_loop = False
         else:
             return_df = concat((return_df, \
                                sqlContext.createDataFrame( \
                                 sc.parallelize([remove_nones(df.filter(df[col] == entry[0]).rdd.reduce(func))]))))
     return return_df

然后你通过创建一个名为 test_df 的 DataFrame 并运行它来启动它:

reduce_by(test_df, 'name', combine_rows).show()

【问题讨论】:

我投票结束这个问题,因为它属于Code Review Stack Exchange。 当然@zero323。我在这里还是个新手(Python 新手,Spark 新手,Stack Overflow 新手),还在学习事情的发展方向。我去那边问问我应该做些什么来结束这个问题? 我不知道。对于重复项,您可以简单地批准,但我不确定其他近距离投票是否有类似的事情。关于您的问题,可以定义 UDAF。我提供了some examples on SO。 【参考方案1】:

我认为对于您的特定聚合需求,这也可以:

from pyspark.sql import SQLContext

data = sc.parallelize([("2000-01-01", "blue", "Alice", "allyblue", None, None),\
                      ("1999-12-31", None, "Bob", None, "BobbyG", "Gangsters Paradise"),\
                         (None, None, "Alice", None, None, "Rolling in the Deep") ])

df = sqlContext.createDataFrame(\
data, ["birthdate", "favecolor", "name", "twitterhandle", "facebookpage", "favesong"])

df = df.groupBy(df.name).agg('birthdate': 'min', 'favecolor':'min', \
                        'twitterhandle':'min', 'facebookpage':'min', 'favesong':'min')
print df.collect()

[Row(name=u'Alice', min(favesong)=u'Rolling in the Deep',
min(twitterhandle)=u'allyblue', min(favecolor)=u'blue', 
min(facebookpage)=u'null', min(birthdate)=u'2000-01-01'), Row(name=u'Bob',
min(favesong)=u'Gangsters Paradise', min(twitterhandle)=u'null', 
min(favecolor)=u'null', min(facebookpage)=u'BobbyG', min(birthdate)=u'1999-12-31')]

【讨论】:

非常感谢。我不知道我可以那样使用“min”。将来我会牢记这一点。虽然现在我将按照@zero323 的建议前往 Code Review Stack Exchange,我会看看那里是否有人知道使用任意函数从 DataFrame 聚合分组数据的更好方法。

以上是关于Spark DataFrame 的通用“reduceBy”或“groupBy + aggregate”功能的主要内容,如果未能解决你的问题,请参考以下文章

spark利用sparkSQL将数据写入hive两种通用方式实现及比较

spark利用sparkSQL将数据写入hive两种通用方式实现及比较

java的怎么操作spark的dataframe

[Spark][Python][DataFrame][SQL]Spark对DataFrame直接执行SQL处理的例子

spark dataframe 怎么去除第一行数据

如何打印 spark dataframe