如何使用 sqlContext 计算累积和

Posted

技术标签:

【中文标题】如何使用 sqlContext 计算累积和【英文标题】:How to calculate cumulative sum using sqlContext 【发布时间】:2016-01-11 16:18:19 【问题描述】:

我知道我们可以使用Window function in pyspark 来计算累积和。但 Window 仅在 HiveContext 中受支持,在 SQLContext 中不支持。我需要使用 SQLContext,因为 HiveContext 不能在多进程中运行。

有没有使用 SQLContext 计算累积和的有效方法?一种简单的方法是将数据加载到驱动程序的内存中并使用numpy.cumsum,但缺点是数据需要能够放入内存中

【问题讨论】:

需要使用 SQLContext,因为 HiveContext 不能在多进程中运行 - 嗯?您愿意详细说明吗? 我已经广泛使用了带有 sqlContext 的窗口函数。 @zero323 HiveContext 的限制。我面临与mail-archives.apache.org/mod_mbox/incubator-spark-user/… 相同的问题 不是HiveContext的限制。您只需将嵌入式 Derby 用作不用于生产的元存储。请参阅我对***.com/q/34705886/1560062的回答 无需更改 Spark 代码。但是你需要一些 DevOps 技能。 【参考方案1】:

不确定这是否是您要查找的内容,但这里有两个示例如何使用 sqlContext 计算累积和:

首先,当您想按某些类别对其进行分区时:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

输出:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

第二个,当你只想取一个变量的 cumsum 时。将 df2 更改为:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

输出:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

希望这会有所帮助。收集数据后使用 np.cumsum 效率不是很高,尤其是在数据集很大的情况下。您可以探索的另一种方法是使用简单的 RDD 转换,如 groupByKey(),然后使用 map 通过某个键计算每个组的累积和,然后在最后减少它。

【讨论】:

谢谢,但您的解决方案适用于 hiveContext,而不是 sqlContext。你能输出你的sqlContext吗?它应该表明它是一个 hiveContext【参考方案2】:

这是一个简单的例子:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

给出输出

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

定义一个窗口

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

# window.Window.unboundedPreceding -- 组的第一行 # .rowsBetween(..., 0) -- 0 引用当前行,如果指定 -2 则在当前行之前最多 2 行

现在,这是一个陷阱:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

有错误:

TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

这是由于使用了 python 的 sum 函数而不是 pyspark's。解决这个问题的方法是使用来自pyspark.sql.functions.sumsum 函数:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

会给:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+

【讨论】:

win_spec 未在您的示例中定义,您可以添加它吗?对理解你的好例子最有帮助 哎呀我的坏@Mike 会尝试挖掘我的代码库;)手指交叉【参考方案3】:

登陆该线程并尝试解决类似问题后,我已使用此代码解决了我的问题。不确定我是否缺少 OP 的一部分,但这是对 SQLContext 列求和的一种方式:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext

sc = SparkContext() 
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)

def sum_column(table, column):
    sc_table = sqlContext.table(table)
    return sc_table.agg(column: "sum")

sum_column("db.tablename", "column").show()

【讨论】:

【参考方案4】:

windows 函数仅适用于 HiveContext 是不正确的。您甚至可以在 sqlContext 中使用它们:

from pyspark.sql.window import *

myPartition=Window.partitionBy(['col1','col2','col3'])

temp= temp.withColumn("#dummy",sum(temp.col4).over(myPartition))

【讨论】:

只有在 spark 2.0+ 上才能使用带有 SQLContext 的窗口函数。对于 Spark 版本 1.4 ~ 1.6,需要使用 HiveContext 不,它们是从 spark 版本 1.4 引入的 它们从 1.4 开始就存在,但在 Spark 2 之前,必须使用 HiveContext。但是,在许多发行版中,spark-shell 和 pyspark 中“sqlContext”实例的默认类实际上是 HiveContext,因此这可能会引起一些混淆,人们会认为可以将窗口函数与正常的 SQLContext。您可以参考这个问题了解更多信息:***.com/questions/36171349/…

以上是关于如何使用 sqlContext 计算累积和的主要内容,如果未能解决你的问题,请参考以下文章

pyspark中的累积和

如何在 LINQ 中计算累积和?

POWER BI 如何计算不唯一选项的累积占比

Anylogic:如何计算累积和?

如何复制累积字段

如何在python中计算正态累积分布函数的倒数?